get more metrics from go benchmark servers (#913)
* add user and system cpu usage to go benchmarks * fix import and style issues * sample elapsed time diffs after merging histograms * style fixes and variables renames * add pprof server to benchmark workers to grab different profile stats * rename variables for consistency and default to no pprof
This commit is contained in:
@ -37,6 +37,7 @@ import (
|
|||||||
"math"
|
"math"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
@ -85,6 +86,7 @@ type benchmarkClient struct {
|
|||||||
lastResetTime time.Time
|
lastResetTime time.Time
|
||||||
histogramOptions stats.HistogramOptions
|
histogramOptions stats.HistogramOptions
|
||||||
lockingHistograms []lockingHistogram
|
lockingHistograms []lockingHistogram
|
||||||
|
rusageLastReset *syscall.Rusage
|
||||||
}
|
}
|
||||||
|
|
||||||
func printClientConfig(config *testpb.ClientConfig) {
|
func printClientConfig(config *testpb.ClientConfig) {
|
||||||
@ -226,6 +228,9 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rusage := new(syscall.Rusage)
|
||||||
|
syscall.Getrusage(syscall.RUSAGE_SELF, rusage)
|
||||||
|
|
||||||
rpcCountPerConn := int(config.OutstandingRpcsPerChannel)
|
rpcCountPerConn := int(config.OutstandingRpcsPerChannel)
|
||||||
bc := &benchmarkClient{
|
bc := &benchmarkClient{
|
||||||
histogramOptions: stats.HistogramOptions{
|
histogramOptions: stats.HistogramOptions{
|
||||||
@ -236,9 +241,10 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
|
|||||||
},
|
},
|
||||||
lockingHistograms: make([]lockingHistogram, rpcCountPerConn*len(conns), rpcCountPerConn*len(conns)),
|
lockingHistograms: make([]lockingHistogram, rpcCountPerConn*len(conns), rpcCountPerConn*len(conns)),
|
||||||
|
|
||||||
stop: make(chan bool),
|
stop: make(chan bool),
|
||||||
lastResetTime: time.Now(),
|
lastResetTime: time.Now(),
|
||||||
closeConns: closeConns,
|
closeConns: closeConns,
|
||||||
|
rusageLastReset: rusage,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = performRPCs(config, conns, bc); err != nil {
|
if err = performRPCs(config, conns, bc); err != nil {
|
||||||
@ -338,8 +344,9 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou
|
|||||||
// getStats returns the stats for benchmark client.
|
// getStats returns the stats for benchmark client.
|
||||||
// It resets lastResetTime and all histograms if argument reset is true.
|
// It resets lastResetTime and all histograms if argument reset is true.
|
||||||
func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats {
|
func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats {
|
||||||
var timeElapsed float64
|
var wallTimeElapsed, uTimeElapsed, sTimeElapsed float64
|
||||||
mergedHistogram := stats.NewHistogram(bc.histogramOptions)
|
mergedHistogram := stats.NewHistogram(bc.histogramOptions)
|
||||||
|
latestRusage := new(syscall.Rusage)
|
||||||
|
|
||||||
if reset {
|
if reset {
|
||||||
// Merging histogram may take some time.
|
// Merging histogram may take some time.
|
||||||
@ -353,14 +360,21 @@ func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats {
|
|||||||
mergedHistogram.Merge(toMerge[i])
|
mergedHistogram.Merge(toMerge[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
timeElapsed = time.Since(bc.lastResetTime).Seconds()
|
wallTimeElapsed = time.Since(bc.lastResetTime).Seconds()
|
||||||
|
syscall.Getrusage(syscall.RUSAGE_SELF, latestRusage)
|
||||||
|
uTimeElapsed, sTimeElapsed = cpuTimeDiff(bc.rusageLastReset, latestRusage)
|
||||||
|
|
||||||
|
bc.rusageLastReset = latestRusage
|
||||||
bc.lastResetTime = time.Now()
|
bc.lastResetTime = time.Now()
|
||||||
} else {
|
} else {
|
||||||
// Merge only, not reset.
|
// Merge only, not reset.
|
||||||
for i := range bc.lockingHistograms {
|
for i := range bc.lockingHistograms {
|
||||||
bc.lockingHistograms[i].mergeInto(mergedHistogram)
|
bc.lockingHistograms[i].mergeInto(mergedHistogram)
|
||||||
}
|
}
|
||||||
timeElapsed = time.Since(bc.lastResetTime).Seconds()
|
|
||||||
|
wallTimeElapsed = time.Since(bc.lastResetTime).Seconds()
|
||||||
|
syscall.Getrusage(syscall.RUSAGE_SELF, latestRusage)
|
||||||
|
uTimeElapsed, sTimeElapsed = cpuTimeDiff(bc.rusageLastReset, latestRusage)
|
||||||
}
|
}
|
||||||
|
|
||||||
b := make([]uint32, len(mergedHistogram.Buckets), len(mergedHistogram.Buckets))
|
b := make([]uint32, len(mergedHistogram.Buckets), len(mergedHistogram.Buckets))
|
||||||
@ -376,9 +390,9 @@ func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats {
|
|||||||
SumOfSquares: float64(mergedHistogram.SumOfSquares),
|
SumOfSquares: float64(mergedHistogram.SumOfSquares),
|
||||||
Count: float64(mergedHistogram.Count),
|
Count: float64(mergedHistogram.Count),
|
||||||
},
|
},
|
||||||
TimeElapsed: timeElapsed,
|
TimeElapsed: wallTimeElapsed,
|
||||||
TimeUser: 0,
|
TimeUser: uTimeElapsed,
|
||||||
TimeSystem: 0,
|
TimeSystem: sTimeElapsed,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,6 +38,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
@ -55,11 +56,12 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type benchmarkServer struct {
|
type benchmarkServer struct {
|
||||||
port int
|
port int
|
||||||
cores int
|
cores int
|
||||||
closeFunc func()
|
closeFunc func()
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
lastResetTime time.Time
|
lastResetTime time.Time
|
||||||
|
rusageLastReset *syscall.Rusage
|
||||||
}
|
}
|
||||||
|
|
||||||
func printServerConfig(config *testpb.ServerConfig) {
|
func printServerConfig(config *testpb.ServerConfig) {
|
||||||
@ -156,18 +158,35 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma
|
|||||||
grpclog.Fatalf("failed to get port number from server address: %v", err)
|
grpclog.Fatalf("failed to get port number from server address: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &benchmarkServer{port: p, cores: numOfCores, closeFunc: closeFunc, lastResetTime: time.Now()}, nil
|
rusage := new(syscall.Rusage)
|
||||||
|
syscall.Getrusage(syscall.RUSAGE_SELF, rusage)
|
||||||
|
|
||||||
|
return &benchmarkServer{
|
||||||
|
port: p,
|
||||||
|
cores: numOfCores,
|
||||||
|
closeFunc: closeFunc,
|
||||||
|
lastResetTime: time.Now(),
|
||||||
|
rusageLastReset: rusage,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getStats returns the stats for benchmark server.
|
// getStats returns the stats for benchmark server.
|
||||||
// It resets lastResetTime if argument reset is true.
|
// It resets lastResetTime if argument reset is true.
|
||||||
func (bs *benchmarkServer) getStats(reset bool) *testpb.ServerStats {
|
func (bs *benchmarkServer) getStats(reset bool) *testpb.ServerStats {
|
||||||
// TODO wall time, sys time, user time.
|
|
||||||
bs.mu.RLock()
|
bs.mu.RLock()
|
||||||
defer bs.mu.RUnlock()
|
defer bs.mu.RUnlock()
|
||||||
timeElapsed := time.Since(bs.lastResetTime).Seconds()
|
wallTimeElapsed := time.Since(bs.lastResetTime).Seconds()
|
||||||
|
rusageLatest := new(syscall.Rusage)
|
||||||
|
syscall.Getrusage(syscall.RUSAGE_SELF, rusageLatest)
|
||||||
|
uTimeElapsed, sTimeElapsed := cpuTimeDiff(bs.rusageLastReset, rusageLatest)
|
||||||
|
|
||||||
if reset {
|
if reset {
|
||||||
bs.lastResetTime = time.Now()
|
bs.lastResetTime = time.Now()
|
||||||
|
bs.rusageLastReset = rusageLatest
|
||||||
|
}
|
||||||
|
return &testpb.ServerStats{
|
||||||
|
TimeElapsed: wallTimeElapsed,
|
||||||
|
TimeUser: uTimeElapsed,
|
||||||
|
TimeSystem: sTimeElapsed,
|
||||||
}
|
}
|
||||||
return &testpb.ServerStats{TimeElapsed: timeElapsed, TimeUser: 0, TimeSystem: 0}
|
|
||||||
}
|
}
|
||||||
|
@ -38,6 +38,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
|
"net/http"
|
||||||
|
_ "net/http/pprof"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
@ -50,8 +52,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
driverPort = flag.Int("driver_port", 10000, "port for communication with driver")
|
driverPort = flag.Int("driver_port", 10000, "port for communication with driver")
|
||||||
serverPort = flag.Int("server_port", 0, "port for benchmark server if not specified by server config message")
|
serverPort = flag.Int("server_port", 0, "port for benchmark server if not specified by server config message")
|
||||||
|
pprofPort = flag.Int("pprof_port", -1, "Port for pprof debug server to listen on. Pprof server doesn't start if unset")
|
||||||
|
blockProfRate = flag.Int("block_prof_rate", 0, "fraction of goroutine blocking events to report in blocking profile")
|
||||||
)
|
)
|
||||||
|
|
||||||
type byteBufCodec struct {
|
type byteBufCodec struct {
|
||||||
@ -227,5 +231,14 @@ func main() {
|
|||||||
s.Stop()
|
s.Stop()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
runtime.SetBlockProfileRate(*blockProfRate)
|
||||||
|
|
||||||
|
if *pprofPort >= 0 {
|
||||||
|
go func() {
|
||||||
|
grpclog.Println("Starting pprof server on port " + strconv.Itoa(*pprofPort))
|
||||||
|
grpclog.Println(http.ListenAndServe("localhost:"+strconv.Itoa(*pprofPort), nil))
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
s.Serve(lis)
|
s.Serve(lis)
|
||||||
}
|
}
|
||||||
|
@ -36,6 +36,7 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"syscall"
|
||||||
)
|
)
|
||||||
|
|
||||||
// abs returns the absolute path the given relative file or directory path,
|
// abs returns the absolute path the given relative file or directory path,
|
||||||
@ -52,6 +53,20 @@ func abs(rel string) string {
|
|||||||
return filepath.Join(v, rel)
|
return filepath.Join(v, rel)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func cpuTimeDiff(first *syscall.Rusage, latest *syscall.Rusage) (float64, float64) {
|
||||||
|
var (
|
||||||
|
utimeDiffs = latest.Utime.Sec - first.Utime.Sec
|
||||||
|
utimeDiffus = latest.Utime.Usec - first.Utime.Usec
|
||||||
|
stimeDiffs = latest.Stime.Sec - first.Stime.Sec
|
||||||
|
stimeDiffus = latest.Stime.Usec - first.Stime.Usec
|
||||||
|
)
|
||||||
|
|
||||||
|
uTimeElapsed := float64(utimeDiffs) + float64(utimeDiffus)*1.0e-6
|
||||||
|
sTimeElapsed := float64(stimeDiffs) + float64(stimeDiffus)*1.0e-6
|
||||||
|
|
||||||
|
return uTimeElapsed, sTimeElapsed
|
||||||
|
}
|
||||||
|
|
||||||
func goPackagePath(pkg string) (path string, err error) {
|
func goPackagePath(pkg string) (path string, err error) {
|
||||||
gp := os.Getenv("GOPATH")
|
gp := os.Getenv("GOPATH")
|
||||||
if gp == "" {
|
if gp == "" {
|
||||||
|
Reference in New Issue
Block a user