Merge histograms into one histogram in getStats
This commit is contained in:
@ -235,7 +235,7 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe
|
|||||||
bc.mutexes[idx] = new(sync.RWMutex)
|
bc.mutexes[idx] = new(sync.RWMutex)
|
||||||
bc.histograms[idx] = stats.NewHistogram(bc.histogramOptions)
|
bc.histograms[idx] = stats.NewHistogram(bc.histogramOptions)
|
||||||
// Start goroutine on the created mutex and histogram.
|
// Start goroutine on the created mutex and histogram.
|
||||||
go func(mu *sync.RWMutex, histogram *stats.Histogram) {
|
go func(idx int) {
|
||||||
// TODO: do warm up if necessary.
|
// TODO: do warm up if necessary.
|
||||||
// Now relying on worker client to reserve time to do warm up.
|
// Now relying on worker client to reserve time to do warm up.
|
||||||
// The worker client needs to wait for some time after client is created,
|
// The worker client needs to wait for some time after client is created,
|
||||||
@ -252,9 +252,9 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
elapse := time.Since(start)
|
elapse := time.Since(start)
|
||||||
mu.Lock()
|
bc.mutexes[idx].Lock()
|
||||||
histogram.Add(int64(elapse))
|
bc.histograms[idx].Add(int64(elapse))
|
||||||
mu.Unlock()
|
bc.mutexes[idx].Unlock()
|
||||||
select {
|
select {
|
||||||
case <-bc.stop:
|
case <-bc.stop:
|
||||||
case done <- true:
|
case done <- true:
|
||||||
@ -266,7 +266,7 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe
|
|||||||
case <-done:
|
case <-done:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(bc.mutexes[idx], bc.histograms[idx])
|
}(idx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -291,7 +291,7 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou
|
|||||||
bc.mutexes[idx] = new(sync.RWMutex)
|
bc.mutexes[idx] = new(sync.RWMutex)
|
||||||
bc.histograms[idx] = stats.NewHistogram(bc.histogramOptions)
|
bc.histograms[idx] = stats.NewHistogram(bc.histogramOptions)
|
||||||
// Start goroutine on the created mutex and histogram.
|
// Start goroutine on the created mutex and histogram.
|
||||||
go func(mu *sync.RWMutex, histogram *stats.Histogram) {
|
go func(idx int) {
|
||||||
// TODO: do warm up if necessary.
|
// TODO: do warm up if necessary.
|
||||||
// Now relying on worker client to reserve time to do warm up.
|
// Now relying on worker client to reserve time to do warm up.
|
||||||
// The worker client needs to wait for some time after client is created,
|
// The worker client needs to wait for some time after client is created,
|
||||||
@ -308,9 +308,9 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
elapse := time.Since(start)
|
elapse := time.Since(start)
|
||||||
mu.Lock()
|
bc.mutexes[idx].Lock()
|
||||||
histogram.Add(int64(elapse))
|
bc.histograms[idx].Add(int64(elapse))
|
||||||
mu.Unlock()
|
bc.mutexes[idx].Unlock()
|
||||||
select {
|
select {
|
||||||
case <-bc.stop:
|
case <-bc.stop:
|
||||||
case done <- true:
|
case done <- true:
|
||||||
@ -322,7 +322,7 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou
|
|||||||
case <-done:
|
case <-done:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(bc.mutexes[idx], bc.histograms[idx])
|
}(idx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -330,20 +330,32 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou
|
|||||||
func (bc *benchmarkClient) getStats() *testpb.ClientStats {
|
func (bc *benchmarkClient) getStats() *testpb.ClientStats {
|
||||||
timeElapsed := time.Since(bc.lastResetTime).Seconds()
|
timeElapsed := time.Since(bc.lastResetTime).Seconds()
|
||||||
|
|
||||||
// TODO merge histograms.
|
// Merging histogram may take some time.
|
||||||
b := make([]uint32, len(bc.histograms[0].Buckets))
|
// Put all histograms aside and merge later.
|
||||||
var totalcount int64
|
toMerge := make([]*stats.Histogram, len(bc.histograms))
|
||||||
for _, h := range bc.histograms {
|
for i := range bc.histograms {
|
||||||
totalcount += h.Count
|
bc.mutexes[i].Lock()
|
||||||
|
toMerge[i] = bc.histograms[i]
|
||||||
|
bc.histograms[i] = stats.NewHistogram(bc.histogramOptions)
|
||||||
|
bc.mutexes[i].Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 1; i < len(toMerge); i++ {
|
||||||
|
toMerge[0].Merge(toMerge[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
b := make([]uint32, len(toMerge[0].Buckets))
|
||||||
|
for i, v := range toMerge[0].Buckets {
|
||||||
|
b[i] = uint32(v.Count)
|
||||||
}
|
}
|
||||||
return &testpb.ClientStats{
|
return &testpb.ClientStats{
|
||||||
Latencies: &testpb.HistogramData{
|
Latencies: &testpb.HistogramData{
|
||||||
Bucket: b,
|
Bucket: b,
|
||||||
MinSeen: float64(bc.histograms[0].Min),
|
MinSeen: float64(toMerge[0].Min),
|
||||||
MaxSeen: float64(bc.histograms[0].Max),
|
MaxSeen: float64(toMerge[0].Max),
|
||||||
Sum: float64(bc.histograms[0].Sum),
|
Sum: float64(toMerge[0].Sum),
|
||||||
SumOfSquares: float64(bc.histograms[0].SumOfSquares),
|
SumOfSquares: float64(toMerge[0].SumOfSquares),
|
||||||
Count: float64(totalcount),
|
Count: float64(toMerge[0].Count),
|
||||||
},
|
},
|
||||||
TimeElapsed: timeElapsed,
|
TimeElapsed: timeElapsed,
|
||||||
TimeUser: 0,
|
TimeUser: 0,
|
||||||
|
Reference in New Issue
Block a user