From e1243883d65512f5699eb8c6a99f0059fda10058 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 6 May 2016 14:49:12 -0700 Subject: [PATCH] Merge histograms into one histogram in getStats --- benchmark/worker/benchmark_client.go | 52 +++++++++++++++++----------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go index 9c4c7a64..03468950 100644 --- a/benchmark/worker/benchmark_client.go +++ b/benchmark/worker/benchmark_client.go @@ -235,7 +235,7 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe bc.mutexes[idx] = new(sync.RWMutex) bc.histograms[idx] = stats.NewHistogram(bc.histogramOptions) // Start goroutine on the created mutex and histogram. - go func(mu *sync.RWMutex, histogram *stats.Histogram) { + go func(idx int) { // TODO: do warm up if necessary. // Now relying on worker client to reserve time to do warm up. // The worker client needs to wait for some time after client is created, @@ -252,9 +252,9 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe return } elapse := time.Since(start) - mu.Lock() - histogram.Add(int64(elapse)) - mu.Unlock() + bc.mutexes[idx].Lock() + bc.histograms[idx].Add(int64(elapse)) + bc.mutexes[idx].Unlock() select { case <-bc.stop: case done <- true: @@ -266,7 +266,7 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe case <-done: } } - }(bc.mutexes[idx], bc.histograms[idx]) + }(idx) } } } @@ -291,7 +291,7 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou bc.mutexes[idx] = new(sync.RWMutex) bc.histograms[idx] = stats.NewHistogram(bc.histogramOptions) // Start goroutine on the created mutex and histogram. - go func(mu *sync.RWMutex, histogram *stats.Histogram) { + go func(idx int) { // TODO: do warm up if necessary. // Now relying on worker client to reserve time to do warm up. // The worker client needs to wait for some time after client is created, @@ -308,9 +308,9 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou return } elapse := time.Since(start) - mu.Lock() - histogram.Add(int64(elapse)) - mu.Unlock() + bc.mutexes[idx].Lock() + bc.histograms[idx].Add(int64(elapse)) + bc.mutexes[idx].Unlock() select { case <-bc.stop: case done <- true: @@ -322,7 +322,7 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou case <-done: } } - }(bc.mutexes[idx], bc.histograms[idx]) + }(idx) } } } @@ -330,20 +330,32 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou func (bc *benchmarkClient) getStats() *testpb.ClientStats { timeElapsed := time.Since(bc.lastResetTime).Seconds() - // TODO merge histograms. - b := make([]uint32, len(bc.histograms[0].Buckets)) - var totalcount int64 - for _, h := range bc.histograms { - totalcount += h.Count + // Merging histogram may take some time. + // Put all histograms aside and merge later. + toMerge := make([]*stats.Histogram, len(bc.histograms)) + for i := range bc.histograms { + bc.mutexes[i].Lock() + toMerge[i] = bc.histograms[i] + bc.histograms[i] = stats.NewHistogram(bc.histogramOptions) + bc.mutexes[i].Unlock() + } + + for i := 1; i < len(toMerge); i++ { + toMerge[0].Merge(toMerge[i]) + } + + b := make([]uint32, len(toMerge[0].Buckets)) + for i, v := range toMerge[0].Buckets { + b[i] = uint32(v.Count) } return &testpb.ClientStats{ Latencies: &testpb.HistogramData{ Bucket: b, - MinSeen: float64(bc.histograms[0].Min), - MaxSeen: float64(bc.histograms[0].Max), - Sum: float64(bc.histograms[0].Sum), - SumOfSquares: float64(bc.histograms[0].SumOfSquares), - Count: float64(totalcount), + MinSeen: float64(toMerge[0].Min), + MaxSeen: float64(toMerge[0].Max), + Sum: float64(toMerge[0].Sum), + SumOfSquares: float64(toMerge[0].SumOfSquares), + Count: float64(toMerge[0].Count), }, TimeElapsed: timeElapsed, TimeUser: 0,