From 423a9dea781499ea71eb752fc5c21336b5b289c5 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 11 May 2016 17:20:16 -0700 Subject: [PATCH] Change getStats to do reseting if required. --- benchmark/worker/benchmark_client.go | 61 ++++++++++++++++------------ benchmark/worker/benchmark_server.go | 16 ++++---- benchmark/worker/main.go | 14 ++----- 3 files changed, 46 insertions(+), 45 deletions(-) diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go index 74304f33..37b9ae24 100644 --- a/benchmark/worker/benchmark_client.go +++ b/benchmark/worker/benchmark_client.go @@ -327,35 +327,49 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou } } -func (bc *benchmarkClient) getStats() *testpb.ClientStats { +// getStats return the stats for benchmark client. +// It resets lastResetTime and all histograms if argument reset is true. +func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats { timeElapsed := time.Since(bc.lastResetTime).Seconds() + mergedHistogram := stats.NewHistogram(bc.histogramOptions) - // Merging histogram may take some time. - // Put all histograms aside and merge later. - toMerge := make([]*stats.Histogram, len(bc.histograms)) - for i := range bc.histograms { - bc.mutexes[i].Lock() - toMerge[i] = bc.histograms[i] - bc.histograms[i] = stats.NewHistogram(bc.histogramOptions) - bc.mutexes[i].Unlock() + if reset { + bc.lastResetTime = time.Now() + + // Merging histogram may take some time. + // Put all histograms aside and merge later. + toMerge := make([]*stats.Histogram, len(bc.histograms)) + for i := range bc.histograms { + bc.mutexes[i].Lock() + toMerge[i] = bc.histograms[i] + bc.histograms[i] = stats.NewHistogram(bc.histogramOptions) + bc.mutexes[i].Unlock() + } + + for i := 0; i < len(toMerge); i++ { + mergedHistogram.Merge(toMerge[i]) + } + } else { + // Only merging histograms, not resetting. + for i := range bc.histograms { + bc.mutexes[i].Lock() + mergedHistogram.Merge(bc.histograms[i]) + bc.mutexes[i].Unlock() + } } - for i := 1; i < len(toMerge); i++ { - toMerge[0].Merge(toMerge[i]) - } - - b := make([]uint32, len(toMerge[0].Buckets)) - for i, v := range toMerge[0].Buckets { + b := make([]uint32, len(mergedHistogram.Buckets)) + for i, v := range mergedHistogram.Buckets { b[i] = uint32(v.Count) } return &testpb.ClientStats{ Latencies: &testpb.HistogramData{ Bucket: b, - MinSeen: float64(toMerge[0].Min), - MaxSeen: float64(toMerge[0].Max), - Sum: float64(toMerge[0].Sum), - SumOfSquares: float64(toMerge[0].SumOfSquares), - Count: float64(toMerge[0].Count), + MinSeen: float64(mergedHistogram.Min), + MaxSeen: float64(mergedHistogram.Max), + Sum: float64(mergedHistogram.Sum), + SumOfSquares: float64(mergedHistogram.SumOfSquares), + Count: float64(mergedHistogram.Count), }, TimeElapsed: timeElapsed, TimeUser: 0, @@ -363,13 +377,6 @@ func (bc *benchmarkClient) getStats() *testpb.ClientStats { } } -// reset clears the contents for histogram and set lastResetTime to Now(). -// It is called to get ready for benchmark runs. -func (bc *benchmarkClient) reset() { - bc.lastResetTime = time.Now() - bc.histograms[0].Clear() -} - func (bc *benchmarkClient) shutdown() { close(bc.stop) bc.closeConns() diff --git a/benchmark/worker/benchmark_server.go b/benchmark/worker/benchmark_server.go index 7ccb069c..f0e38e03 100644 --- a/benchmark/worker/benchmark_server.go +++ b/benchmark/worker/benchmark_server.go @@ -158,15 +158,15 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma return &benchmarkServer{port: p, cores: numOfCores, closeFunc: closeFunc, lastResetTime: time.Now()}, nil } -func (bs *benchmarkServer) getStats() *testpb.ServerStats { +// getStats return the stats for benchmark server. +// It resets lastResetTime if argument reset is true. +func (bs *benchmarkServer) getStats(reset bool) *testpb.ServerStats { // TODO wall time, sys time, user time. bs.mu.RLock() defer bs.mu.RUnlock() - return &testpb.ServerStats{TimeElapsed: time.Since(bs.lastResetTime).Seconds(), TimeUser: 0, TimeSystem: 0} -} - -func (bs *benchmarkServer) reset() { - bs.mu.Lock() - defer bs.mu.Unlock() - bs.lastResetTime = time.Now() + timeElapsed := time.Since(bs.lastResetTime).Seconds() + if reset { + bs.lastResetTime = time.Now() + } + return &testpb.ServerStats{TimeElapsed: timeElapsed, TimeUser: 0, TimeSystem: 0} } diff --git a/benchmark/worker/main.go b/benchmark/worker/main.go index 236ca8df..8fc2160c 100644 --- a/benchmark/worker/main.go +++ b/benchmark/worker/main.go @@ -116,7 +116,7 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er return err } out = &testpb.ServerStatus{ - Stats: bs.getStats(), + Stats: bs.getStats(false), Port: int32(bs.port), Cores: int32(bs.cores), } @@ -128,13 +128,10 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er return grpc.Errorf(codes.InvalidArgument, "server does not exist when mark received") } out = &testpb.ServerStatus{ - Stats: bs.getStats(), + Stats: bs.getStats(argtype.Mark.Reset_), Port: int32(bs.port), Cores: int32(bs.cores), } - if argtype.Mark.Reset_ { - bs.reset() - } } if err := stream.Send(out); err != nil { @@ -176,7 +173,7 @@ func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) er return err } out = &testpb.ClientStatus{ - Stats: bc.getStats(), + Stats: bc.getStats(false), } case *testpb.ClientArgs_Mark: @@ -186,10 +183,7 @@ func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) er return grpc.Errorf(codes.InvalidArgument, "client does not exist when mark received") } out = &testpb.ClientStatus{ - Stats: bc.getStats(), - } - if t.Mark.Reset_ { - bc.reset() + Stats: bc.getStats(t.Mark.Reset_), } }