Change getStats to do reseting if required.

This commit is contained in:
Menghan Li
2016-05-11 17:20:16 -07:00
parent 9521f8ddae
commit 423a9dea78
3 changed files with 46 additions and 45 deletions

View File

@ -327,8 +327,14 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou
}
}
func (bc *benchmarkClient) getStats() *testpb.ClientStats {
// getStats return the stats for benchmark client.
// It resets lastResetTime and all histograms if argument reset is true.
func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats {
timeElapsed := time.Since(bc.lastResetTime).Seconds()
mergedHistogram := stats.NewHistogram(bc.histogramOptions)
if reset {
bc.lastResetTime = time.Now()
// Merging histogram may take some time.
// Put all histograms aside and merge later.
@ -340,22 +346,30 @@ func (bc *benchmarkClient) getStats() *testpb.ClientStats {
bc.mutexes[i].Unlock()
}
for i := 1; i < len(toMerge); i++ {
toMerge[0].Merge(toMerge[i])
for i := 0; i < len(toMerge); i++ {
mergedHistogram.Merge(toMerge[i])
}
} else {
// Only merging histograms, not resetting.
for i := range bc.histograms {
bc.mutexes[i].Lock()
mergedHistogram.Merge(bc.histograms[i])
bc.mutexes[i].Unlock()
}
}
b := make([]uint32, len(toMerge[0].Buckets))
for i, v := range toMerge[0].Buckets {
b := make([]uint32, len(mergedHistogram.Buckets))
for i, v := range mergedHistogram.Buckets {
b[i] = uint32(v.Count)
}
return &testpb.ClientStats{
Latencies: &testpb.HistogramData{
Bucket: b,
MinSeen: float64(toMerge[0].Min),
MaxSeen: float64(toMerge[0].Max),
Sum: float64(toMerge[0].Sum),
SumOfSquares: float64(toMerge[0].SumOfSquares),
Count: float64(toMerge[0].Count),
MinSeen: float64(mergedHistogram.Min),
MaxSeen: float64(mergedHistogram.Max),
Sum: float64(mergedHistogram.Sum),
SumOfSquares: float64(mergedHistogram.SumOfSquares),
Count: float64(mergedHistogram.Count),
},
TimeElapsed: timeElapsed,
TimeUser: 0,
@ -363,13 +377,6 @@ func (bc *benchmarkClient) getStats() *testpb.ClientStats {
}
}
// reset clears the contents for histogram and set lastResetTime to Now().
// It is called to get ready for benchmark runs.
func (bc *benchmarkClient) reset() {
bc.lastResetTime = time.Now()
bc.histograms[0].Clear()
}
func (bc *benchmarkClient) shutdown() {
close(bc.stop)
bc.closeConns()

View File

@ -158,15 +158,15 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma
return &benchmarkServer{port: p, cores: numOfCores, closeFunc: closeFunc, lastResetTime: time.Now()}, nil
}
func (bs *benchmarkServer) getStats() *testpb.ServerStats {
// getStats return the stats for benchmark server.
// It resets lastResetTime if argument reset is true.
func (bs *benchmarkServer) getStats(reset bool) *testpb.ServerStats {
// TODO wall time, sys time, user time.
bs.mu.RLock()
defer bs.mu.RUnlock()
return &testpb.ServerStats{TimeElapsed: time.Since(bs.lastResetTime).Seconds(), TimeUser: 0, TimeSystem: 0}
}
func (bs *benchmarkServer) reset() {
bs.mu.Lock()
defer bs.mu.Unlock()
timeElapsed := time.Since(bs.lastResetTime).Seconds()
if reset {
bs.lastResetTime = time.Now()
}
return &testpb.ServerStats{TimeElapsed: timeElapsed, TimeUser: 0, TimeSystem: 0}
}

View File

@ -116,7 +116,7 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er
return err
}
out = &testpb.ServerStatus{
Stats: bs.getStats(),
Stats: bs.getStats(false),
Port: int32(bs.port),
Cores: int32(bs.cores),
}
@ -128,13 +128,10 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er
return grpc.Errorf(codes.InvalidArgument, "server does not exist when mark received")
}
out = &testpb.ServerStatus{
Stats: bs.getStats(),
Stats: bs.getStats(argtype.Mark.Reset_),
Port: int32(bs.port),
Cores: int32(bs.cores),
}
if argtype.Mark.Reset_ {
bs.reset()
}
}
if err := stream.Send(out); err != nil {
@ -176,7 +173,7 @@ func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) er
return err
}
out = &testpb.ClientStatus{
Stats: bc.getStats(),
Stats: bc.getStats(false),
}
case *testpb.ClientArgs_Mark:
@ -186,10 +183,7 @@ func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) er
return grpc.Errorf(codes.InvalidArgument, "client does not exist when mark received")
}
out = &testpb.ClientStatus{
Stats: bc.getStats(),
}
if t.Mark.Reset_ {
bc.reset()
Stats: bc.getStats(t.Mark.Reset_),
}
}