Merge pull request #677 from menghanl/multiple_histogram
Multiple histogram
This commit is contained in:
@ -1,135 +0,0 @@
|
||||
package stats
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
// TimeNow is used for testing.
|
||||
TimeNow = time.Now
|
||||
)
|
||||
|
||||
const (
|
||||
hour = 0
|
||||
tenminutes = 1
|
||||
minute = 2
|
||||
)
|
||||
|
||||
// Counter is a counter that keeps track of its recent values over a given
|
||||
// period of time, and with a given resolution. Use newCounter() to instantiate.
|
||||
type Counter struct {
|
||||
mu sync.RWMutex
|
||||
ts [3]*timeseries
|
||||
lastUpdate time.Time
|
||||
}
|
||||
|
||||
// newCounter returns a new Counter.
|
||||
func newCounter() *Counter {
|
||||
now := TimeNow()
|
||||
c := &Counter{}
|
||||
c.ts[hour] = newTimeSeries(now, time.Hour, time.Minute)
|
||||
c.ts[tenminutes] = newTimeSeries(now, 10*time.Minute, 10*time.Second)
|
||||
c.ts[minute] = newTimeSeries(now, time.Minute, time.Second)
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *Counter) advance() time.Time {
|
||||
now := TimeNow()
|
||||
for _, ts := range c.ts {
|
||||
ts.advanceTime(now)
|
||||
}
|
||||
return now
|
||||
}
|
||||
|
||||
// Value returns the current value of the counter.
|
||||
func (c *Counter) Value() int64 {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
return c.ts[minute].headValue()
|
||||
}
|
||||
|
||||
// LastUpdate returns the last update time of the counter.
|
||||
func (c *Counter) LastUpdate() time.Time {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
return c.lastUpdate
|
||||
}
|
||||
|
||||
// Set updates the current value of the counter.
|
||||
func (c *Counter) Set(value int64) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.lastUpdate = c.advance()
|
||||
for _, ts := range c.ts {
|
||||
ts.set(value)
|
||||
}
|
||||
}
|
||||
|
||||
// Incr increments the current value of the counter by 'delta'.
|
||||
func (c *Counter) Incr(delta int64) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.lastUpdate = c.advance()
|
||||
for _, ts := range c.ts {
|
||||
ts.incr(delta)
|
||||
}
|
||||
}
|
||||
|
||||
// Delta1h returns the delta for the last hour.
|
||||
func (c *Counter) Delta1h() int64 {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
c.advance()
|
||||
return c.ts[hour].delta()
|
||||
}
|
||||
|
||||
// Delta10m returns the delta for the last 10 minutes.
|
||||
func (c *Counter) Delta10m() int64 {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
c.advance()
|
||||
return c.ts[tenminutes].delta()
|
||||
}
|
||||
|
||||
// Delta1m returns the delta for the last minute.
|
||||
func (c *Counter) Delta1m() int64 {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
c.advance()
|
||||
return c.ts[minute].delta()
|
||||
}
|
||||
|
||||
// Rate1h returns the rate of change of the counter in the last hour.
|
||||
func (c *Counter) Rate1h() float64 {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
c.advance()
|
||||
return c.ts[hour].rate()
|
||||
}
|
||||
|
||||
// Rate10m returns the rate of change of the counter in the last 10 minutes.
|
||||
func (c *Counter) Rate10m() float64 {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
c.advance()
|
||||
return c.ts[tenminutes].rate()
|
||||
}
|
||||
|
||||
// Rate1m returns the rate of change of the counter in the last minute.
|
||||
func (c *Counter) Rate1m() float64 {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
c.advance()
|
||||
return c.ts[minute].rate()
|
||||
}
|
||||
|
||||
// Reset resets the counter to an empty state.
|
||||
func (c *Counter) Reset() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
now := TimeNow()
|
||||
for _, ts := range c.ts {
|
||||
ts.reset(now)
|
||||
}
|
||||
}
|
@ -4,14 +4,15 @@ import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"math"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// HistogramValue is the value of Histogram objects.
|
||||
type HistogramValue struct {
|
||||
// Histogram accumulates values in the form of a histogram with
|
||||
// exponentially increased bucket sizes.
|
||||
type Histogram struct {
|
||||
// Count is the total number of values added to the histogram.
|
||||
Count int64
|
||||
// Sum is the sum of all the values added to the histogram.
|
||||
@ -24,9 +25,30 @@ type HistogramValue struct {
|
||||
Max int64
|
||||
// Buckets contains all the buckets of the histogram.
|
||||
Buckets []HistogramBucket
|
||||
|
||||
opts HistogramOptions
|
||||
logBaseBucketSize float64
|
||||
oneOverLogOnePlusGrowthFactor float64
|
||||
}
|
||||
|
||||
// HistogramBucket is one histogram bucket.
|
||||
// HistogramOptions contains the parameters that define the histogram's buckets.
|
||||
// The first bucket of the created histogram (with index 0) contains [min, min+n)
|
||||
// where n = BaseBucketSize, min = MinValue.
|
||||
// Bucket i (i>=1) contains [min + n * m^(i-1), min + n * m^i), where m = 1+GrowthFactor.
|
||||
// The type of the values is int64.
|
||||
type HistogramOptions struct {
|
||||
// NumBuckets is the number of buckets.
|
||||
NumBuckets int
|
||||
// GrowthFactor is the growth factor of the buckets. A value of 0.1
|
||||
// indicates that bucket N+1 will be 10% larger than bucket N.
|
||||
GrowthFactor float64
|
||||
// BaseBucketSize is the size of the first bucket.
|
||||
BaseBucketSize float64
|
||||
// MinValue is the lower bound of the first bucket.
|
||||
MinValue int64
|
||||
}
|
||||
|
||||
// HistogramBucket represents one histogram bucket.
|
||||
type HistogramBucket struct {
|
||||
// LowBound is the lower bound of the bucket.
|
||||
LowBound float64
|
||||
@ -34,28 +56,56 @@ type HistogramBucket struct {
|
||||
Count int64
|
||||
}
|
||||
|
||||
// NewHistogram returns a pointer to a new Histogram object that was created
|
||||
// with the provided options.
|
||||
func NewHistogram(opts HistogramOptions) *Histogram {
|
||||
if opts.NumBuckets == 0 {
|
||||
opts.NumBuckets = 32
|
||||
}
|
||||
if opts.BaseBucketSize == 0.0 {
|
||||
opts.BaseBucketSize = 1.0
|
||||
}
|
||||
h := Histogram{
|
||||
Buckets: make([]HistogramBucket, opts.NumBuckets),
|
||||
Min: math.MaxInt64,
|
||||
Max: math.MinInt64,
|
||||
|
||||
opts: opts,
|
||||
logBaseBucketSize: math.Log(opts.BaseBucketSize),
|
||||
oneOverLogOnePlusGrowthFactor: 1 / math.Log(1+opts.GrowthFactor),
|
||||
}
|
||||
m := 1.0 + opts.GrowthFactor
|
||||
delta := opts.BaseBucketSize
|
||||
h.Buckets[0].LowBound = float64(opts.MinValue)
|
||||
for i := 1; i < opts.NumBuckets; i++ {
|
||||
h.Buckets[i].LowBound = float64(opts.MinValue) + delta
|
||||
delta = delta * m
|
||||
}
|
||||
return &h
|
||||
}
|
||||
|
||||
// Print writes textual output of the histogram values.
|
||||
func (v HistogramValue) Print(w io.Writer) {
|
||||
avg := float64(v.Sum) / float64(v.Count)
|
||||
fmt.Fprintf(w, "Count: %d Min: %d Max: %d Avg: %.2f\n", v.Count, v.Min, v.Max, avg)
|
||||
func (h *Histogram) Print(w io.Writer) {
|
||||
avg := float64(h.Sum) / float64(h.Count)
|
||||
fmt.Fprintf(w, "Count: %d Min: %d Max: %d Avg: %.2f\n", h.Count, h.Min, h.Max, avg)
|
||||
fmt.Fprintf(w, "%s\n", strings.Repeat("-", 60))
|
||||
if v.Count <= 0 {
|
||||
if h.Count <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
maxBucketDigitLen := len(strconv.FormatFloat(v.Buckets[len(v.Buckets)-1].LowBound, 'f', 6, 64))
|
||||
maxBucketDigitLen := len(strconv.FormatFloat(h.Buckets[len(h.Buckets)-1].LowBound, 'f', 6, 64))
|
||||
if maxBucketDigitLen < 3 {
|
||||
// For "inf".
|
||||
maxBucketDigitLen = 3
|
||||
}
|
||||
maxCountDigitLen := len(strconv.FormatInt(v.Count, 10))
|
||||
percentMulti := 100 / float64(v.Count)
|
||||
maxCountDigitLen := len(strconv.FormatInt(h.Count, 10))
|
||||
percentMulti := 100 / float64(h.Count)
|
||||
|
||||
accCount := int64(0)
|
||||
for i, b := range v.Buckets {
|
||||
for i, b := range h.Buckets {
|
||||
fmt.Fprintf(w, "[%*f, ", maxBucketDigitLen, b.LowBound)
|
||||
if i+1 < len(v.Buckets) {
|
||||
fmt.Fprintf(w, "%*f)", maxBucketDigitLen, v.Buckets[i+1].LowBound)
|
||||
if i+1 < len(h.Buckets) {
|
||||
fmt.Fprintf(w, "%*f)", maxBucketDigitLen, h.Buckets[i+1].LowBound)
|
||||
} else {
|
||||
fmt.Fprintf(w, "%*s)", maxBucketDigitLen, "inf")
|
||||
}
|
||||
@ -70,89 +120,21 @@ func (v HistogramValue) Print(w io.Writer) {
|
||||
}
|
||||
|
||||
// String returns the textual output of the histogram values as string.
|
||||
func (v HistogramValue) String() string {
|
||||
func (h *Histogram) String() string {
|
||||
var b bytes.Buffer
|
||||
v.Print(&b)
|
||||
h.Print(&b)
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// Histogram accumulates values in the form of a histogram with
|
||||
// exponentially increased bucket sizes.
|
||||
// The first bucket (with index 0) is [0, n) where n = baseBucketSize.
|
||||
// Bucket i (i>=1) contains [n * m^(i-1), n * m^i), where m = 1 + GrowthFactor.
|
||||
// The type of the values is int64.
|
||||
type Histogram struct {
|
||||
opts HistogramOptions
|
||||
buckets []bucketInternal
|
||||
count *Counter
|
||||
sum *Counter
|
||||
sumOfSquares *Counter
|
||||
tracker *Tracker
|
||||
|
||||
logBaseBucketSize float64
|
||||
oneOverLogOnePlusGrowthFactor float64
|
||||
}
|
||||
|
||||
// HistogramOptions contains the parameters that define the histogram's buckets.
|
||||
type HistogramOptions struct {
|
||||
// NumBuckets is the number of buckets.
|
||||
NumBuckets int
|
||||
// GrowthFactor is the growth factor of the buckets. A value of 0.1
|
||||
// indicates that bucket N+1 will be 10% larger than bucket N.
|
||||
GrowthFactor float64
|
||||
// BaseBucketSize is the size of the first bucket.
|
||||
BaseBucketSize float64
|
||||
// MinValue is the lower bound of the first bucket.
|
||||
MinValue int64
|
||||
}
|
||||
|
||||
// bucketInternal is the internal representation of a bucket, which includes a
|
||||
// rate counter.
|
||||
type bucketInternal struct {
|
||||
lowBound float64
|
||||
count *Counter
|
||||
}
|
||||
|
||||
// NewHistogram returns a pointer to a new Histogram object that was created
|
||||
// with the provided options.
|
||||
func NewHistogram(opts HistogramOptions) *Histogram {
|
||||
if opts.NumBuckets == 0 {
|
||||
opts.NumBuckets = 32
|
||||
}
|
||||
if opts.BaseBucketSize == 0.0 {
|
||||
opts.BaseBucketSize = 1.0
|
||||
}
|
||||
h := Histogram{
|
||||
opts: opts,
|
||||
buckets: make([]bucketInternal, opts.NumBuckets),
|
||||
count: newCounter(),
|
||||
sum: newCounter(),
|
||||
sumOfSquares: newCounter(),
|
||||
tracker: newTracker(),
|
||||
|
||||
logBaseBucketSize: math.Log(opts.BaseBucketSize),
|
||||
oneOverLogOnePlusGrowthFactor: 1 / math.Log(1+opts.GrowthFactor),
|
||||
}
|
||||
m := 1.0 + opts.GrowthFactor
|
||||
delta := opts.BaseBucketSize
|
||||
h.buckets[0].lowBound = float64(opts.MinValue)
|
||||
h.buckets[0].count = newCounter()
|
||||
for i := 1; i < opts.NumBuckets; i++ {
|
||||
h.buckets[i].lowBound = float64(opts.MinValue) + delta
|
||||
h.buckets[i].count = newCounter()
|
||||
delta = delta * m
|
||||
}
|
||||
return &h
|
||||
}
|
||||
|
||||
// Clear resets all the content of histogram.
|
||||
func (h *Histogram) Clear() {
|
||||
h.count = newCounter()
|
||||
h.sum = newCounter()
|
||||
h.sumOfSquares = newCounter()
|
||||
h.tracker = newTracker()
|
||||
for _, v := range h.buckets {
|
||||
v.count = newCounter()
|
||||
h.Count = 0
|
||||
h.Sum = 0
|
||||
h.SumOfSquares = 0
|
||||
h.Min = math.MaxInt64
|
||||
h.Max = math.MinInt64
|
||||
for _, v := range h.Buckets {
|
||||
v.Count = 0
|
||||
}
|
||||
}
|
||||
|
||||
@ -167,103 +149,19 @@ func (h *Histogram) Add(value int64) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
h.buckets[bucket].count.Incr(1)
|
||||
h.count.Incr(1)
|
||||
h.sum.Incr(value)
|
||||
h.sumOfSquares.Incr(value * value)
|
||||
h.tracker.Push(value)
|
||||
h.Buckets[bucket].Count++
|
||||
h.Count++
|
||||
h.Sum += value
|
||||
h.SumOfSquares += value * value
|
||||
if value < h.Min {
|
||||
h.Min = value
|
||||
}
|
||||
if value > h.Max {
|
||||
h.Max = value
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// LastUpdate returns the time at which the object was last updated.
|
||||
func (h *Histogram) LastUpdate() time.Time {
|
||||
return h.count.LastUpdate()
|
||||
}
|
||||
|
||||
// Value returns the accumulated state of the histogram since it was created.
|
||||
func (h *Histogram) Value() HistogramValue {
|
||||
b := make([]HistogramBucket, len(h.buckets))
|
||||
for i, v := range h.buckets {
|
||||
b[i] = HistogramBucket{
|
||||
LowBound: v.lowBound,
|
||||
Count: v.count.Value(),
|
||||
}
|
||||
}
|
||||
|
||||
v := HistogramValue{
|
||||
Count: h.count.Value(),
|
||||
Sum: h.sum.Value(),
|
||||
SumOfSquares: h.sumOfSquares.Value(),
|
||||
Min: h.tracker.Min(),
|
||||
Max: h.tracker.Max(),
|
||||
Buckets: b,
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
// Delta1h returns the change in the last hour.
|
||||
func (h *Histogram) Delta1h() HistogramValue {
|
||||
b := make([]HistogramBucket, len(h.buckets))
|
||||
for i, v := range h.buckets {
|
||||
b[i] = HistogramBucket{
|
||||
LowBound: v.lowBound,
|
||||
Count: v.count.Delta1h(),
|
||||
}
|
||||
}
|
||||
|
||||
v := HistogramValue{
|
||||
Count: h.count.Delta1h(),
|
||||
Sum: h.sum.Delta1h(),
|
||||
SumOfSquares: h.sumOfSquares.Delta1h(),
|
||||
Min: h.tracker.Min1h(),
|
||||
Max: h.tracker.Max1h(),
|
||||
Buckets: b,
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
// Delta10m returns the change in the last 10 minutes.
|
||||
func (h *Histogram) Delta10m() HistogramValue {
|
||||
b := make([]HistogramBucket, len(h.buckets))
|
||||
for i, v := range h.buckets {
|
||||
b[i] = HistogramBucket{
|
||||
LowBound: v.lowBound,
|
||||
Count: v.count.Delta10m(),
|
||||
}
|
||||
}
|
||||
|
||||
v := HistogramValue{
|
||||
Count: h.count.Delta10m(),
|
||||
Sum: h.sum.Delta10m(),
|
||||
SumOfSquares: h.sumOfSquares.Delta10m(),
|
||||
Min: h.tracker.Min10m(),
|
||||
Max: h.tracker.Max10m(),
|
||||
Buckets: b,
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
// Delta1m returns the change in the last 10 minutes.
|
||||
func (h *Histogram) Delta1m() HistogramValue {
|
||||
b := make([]HistogramBucket, len(h.buckets))
|
||||
for i, v := range h.buckets {
|
||||
b[i] = HistogramBucket{
|
||||
LowBound: v.lowBound,
|
||||
Count: v.count.Delta1m(),
|
||||
}
|
||||
}
|
||||
|
||||
v := HistogramValue{
|
||||
Count: h.count.Delta1m(),
|
||||
Sum: h.sum.Delta1m(),
|
||||
SumOfSquares: h.sumOfSquares.Delta1m(),
|
||||
Min: h.tracker.Min1m(),
|
||||
Max: h.tracker.Max1m(),
|
||||
Buckets: b,
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func (h *Histogram) findBucket(value int64) (int, error) {
|
||||
delta := float64(value - h.opts.MinValue)
|
||||
var b int
|
||||
@ -273,8 +171,28 @@ func (h *Histogram) findBucket(value int64) (int, error) {
|
||||
// = (log(delta) - log(baseBucketSize)) * (1 / log(1+growthFactor)) + 1
|
||||
b = int((math.Log(delta)-h.logBaseBucketSize)*h.oneOverLogOnePlusGrowthFactor + 1)
|
||||
}
|
||||
if b >= len(h.buckets) {
|
||||
if b >= len(h.Buckets) {
|
||||
return 0, fmt.Errorf("no bucket for value: %d", value)
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// Merge takes another histogram h2, and merges its content into h.
|
||||
// The two histograms must be created by equivalent HistogramOptions.
|
||||
func (h *Histogram) Merge(h2 *Histogram) {
|
||||
if h.opts != h2.opts {
|
||||
log.Fatalf("failed to merge histograms, created by inequivalent options")
|
||||
}
|
||||
h.Count += h2.Count
|
||||
h.Sum += h2.Sum
|
||||
h.SumOfSquares += h2.SumOfSquares
|
||||
if h2.Min < h.Min {
|
||||
h.Min = h2.Min
|
||||
}
|
||||
if h2.Max > h.Max {
|
||||
h.Max = h2.Max
|
||||
}
|
||||
for i, b := range h2.Buckets {
|
||||
h.Buckets[i].Count += b.Count
|
||||
}
|
||||
}
|
||||
|
@ -104,7 +104,7 @@ func (stats *Stats) Print(w io.Writer) {
|
||||
fmt.Fprint(w, "Histogram (empty)\n")
|
||||
} else {
|
||||
fmt.Fprintf(w, "Histogram (unit: %s)\n", fmt.Sprintf("%v", stats.unit)[1:])
|
||||
stats.histogram.Value().Print(w)
|
||||
stats.histogram.Print(w)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,154 +0,0 @@
|
||||
package stats
|
||||
|
||||
import (
|
||||
"math"
|
||||
"time"
|
||||
)
|
||||
|
||||
// timeseries holds the history of a changing value over a predefined period of
|
||||
// time.
|
||||
type timeseries struct {
|
||||
size int // The number of time slots. Equivalent to len(slots).
|
||||
resolution time.Duration // The time resolution of each slot.
|
||||
stepCount int64 // The number of intervals seen since creation.
|
||||
head int // The position of the current time in slots.
|
||||
time time.Time // The time at the beginning of the current time slot.
|
||||
slots []int64 // A circular buffer of time slots.
|
||||
}
|
||||
|
||||
// newTimeSeries returns a newly allocated timeseries that covers the requested
|
||||
// period with the given resolution.
|
||||
func newTimeSeries(initialTime time.Time, period, resolution time.Duration) *timeseries {
|
||||
size := int(period.Nanoseconds()/resolution.Nanoseconds()) + 1
|
||||
return ×eries{
|
||||
size: size,
|
||||
resolution: resolution,
|
||||
stepCount: 1,
|
||||
time: initialTime,
|
||||
slots: make([]int64, size),
|
||||
}
|
||||
}
|
||||
|
||||
// advanceTimeWithFill moves the timeseries forward to time t and fills in any
|
||||
// slots that get skipped in the process with the given value. Values older than
|
||||
// the timeseries period are lost.
|
||||
func (ts *timeseries) advanceTimeWithFill(t time.Time, value int64) {
|
||||
advanceTo := t.Truncate(ts.resolution)
|
||||
if !advanceTo.After(ts.time) {
|
||||
// This is shortcut for the most common case of a busy counter
|
||||
// where updates come in many times per ts.resolution.
|
||||
ts.time = advanceTo
|
||||
return
|
||||
}
|
||||
steps := int(advanceTo.Sub(ts.time).Nanoseconds() / ts.resolution.Nanoseconds())
|
||||
ts.stepCount += int64(steps)
|
||||
if steps > ts.size {
|
||||
steps = ts.size
|
||||
}
|
||||
for steps > 0 {
|
||||
ts.head = (ts.head + 1) % ts.size
|
||||
ts.slots[ts.head] = value
|
||||
steps--
|
||||
}
|
||||
ts.time = advanceTo
|
||||
}
|
||||
|
||||
// advanceTime moves the timeseries forward to time t and fills in any slots
|
||||
// that get skipped in the process with the head value. Values older than the
|
||||
// timeseries period are lost.
|
||||
func (ts *timeseries) advanceTime(t time.Time) {
|
||||
ts.advanceTimeWithFill(t, ts.slots[ts.head])
|
||||
}
|
||||
|
||||
// set sets the current value of the timeseries.
|
||||
func (ts *timeseries) set(value int64) {
|
||||
ts.slots[ts.head] = value
|
||||
}
|
||||
|
||||
// incr sets the current value of the timeseries.
|
||||
func (ts *timeseries) incr(delta int64) {
|
||||
ts.slots[ts.head] += delta
|
||||
}
|
||||
|
||||
// headValue returns the latest value from the timeseries.
|
||||
func (ts *timeseries) headValue() int64 {
|
||||
return ts.slots[ts.head]
|
||||
}
|
||||
|
||||
// headTime returns the time of the latest value from the timeseries.
|
||||
func (ts *timeseries) headTime() time.Time {
|
||||
return ts.time
|
||||
}
|
||||
|
||||
// tailValue returns the oldest value from the timeseries.
|
||||
func (ts *timeseries) tailValue() int64 {
|
||||
if ts.stepCount < int64(ts.size) {
|
||||
return 0
|
||||
}
|
||||
return ts.slots[(ts.head+1)%ts.size]
|
||||
}
|
||||
|
||||
// tailTime returns the time of the oldest value from the timeseries.
|
||||
func (ts *timeseries) tailTime() time.Time {
|
||||
size := int64(ts.size)
|
||||
if ts.stepCount < size {
|
||||
size = ts.stepCount
|
||||
}
|
||||
return ts.time.Add(-time.Duration(size-1) * ts.resolution)
|
||||
}
|
||||
|
||||
// delta returns the difference between the newest and oldest values from the
|
||||
// timeseries.
|
||||
func (ts *timeseries) delta() int64 {
|
||||
return ts.headValue() - ts.tailValue()
|
||||
}
|
||||
|
||||
// rate returns the rate of change between the oldest and newest values from
|
||||
// the timeseries in units per second.
|
||||
func (ts *timeseries) rate() float64 {
|
||||
deltaTime := ts.headTime().Sub(ts.tailTime()).Seconds()
|
||||
if deltaTime == 0 {
|
||||
return 0
|
||||
}
|
||||
return float64(ts.delta()) / deltaTime
|
||||
}
|
||||
|
||||
// min returns the smallest value from the timeseries.
|
||||
func (ts *timeseries) min() int64 {
|
||||
to := ts.size
|
||||
if ts.stepCount < int64(ts.size) {
|
||||
to = ts.head + 1
|
||||
}
|
||||
tail := (ts.head + 1) % ts.size
|
||||
min := int64(math.MaxInt64)
|
||||
for b := 0; b < to; b++ {
|
||||
if b != tail && ts.slots[b] < min {
|
||||
min = ts.slots[b]
|
||||
}
|
||||
}
|
||||
return min
|
||||
}
|
||||
|
||||
// max returns the largest value from the timeseries.
|
||||
func (ts *timeseries) max() int64 {
|
||||
to := ts.size
|
||||
if ts.stepCount < int64(ts.size) {
|
||||
to = ts.head + 1
|
||||
}
|
||||
tail := (ts.head + 1) % ts.size
|
||||
max := int64(math.MinInt64)
|
||||
for b := 0; b < to; b++ {
|
||||
if b != tail && ts.slots[b] > max {
|
||||
max = ts.slots[b]
|
||||
}
|
||||
}
|
||||
return max
|
||||
}
|
||||
|
||||
// reset resets the timeseries to an empty state.
|
||||
func (ts *timeseries) reset(t time.Time) {
|
||||
ts.head = 0
|
||||
ts.time = t
|
||||
ts.stepCount = 1
|
||||
ts.slots = make([]int64, ts.size)
|
||||
}
|
@ -1,159 +0,0 @@
|
||||
package stats
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Tracker is a min/max value tracker that keeps track of its min/max values
|
||||
// over a given period of time, and with a given resolution. The initial min
|
||||
// and max values are math.MaxInt64 and math.MinInt64 respectively.
|
||||
type Tracker struct {
|
||||
mu sync.RWMutex
|
||||
min, max int64 // All time min/max.
|
||||
minTS, maxTS [3]*timeseries
|
||||
lastUpdate time.Time
|
||||
}
|
||||
|
||||
// newTracker returns a new Tracker.
|
||||
func newTracker() *Tracker {
|
||||
now := TimeNow()
|
||||
t := &Tracker{}
|
||||
t.minTS[hour] = newTimeSeries(now, time.Hour, time.Minute)
|
||||
t.minTS[tenminutes] = newTimeSeries(now, 10*time.Minute, 10*time.Second)
|
||||
t.minTS[minute] = newTimeSeries(now, time.Minute, time.Second)
|
||||
t.maxTS[hour] = newTimeSeries(now, time.Hour, time.Minute)
|
||||
t.maxTS[tenminutes] = newTimeSeries(now, 10*time.Minute, 10*time.Second)
|
||||
t.maxTS[minute] = newTimeSeries(now, time.Minute, time.Second)
|
||||
t.init()
|
||||
return t
|
||||
}
|
||||
|
||||
func (t *Tracker) init() {
|
||||
t.min = math.MaxInt64
|
||||
t.max = math.MinInt64
|
||||
for _, ts := range t.minTS {
|
||||
ts.set(math.MaxInt64)
|
||||
}
|
||||
for _, ts := range t.maxTS {
|
||||
ts.set(math.MinInt64)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Tracker) advance() time.Time {
|
||||
now := TimeNow()
|
||||
for _, ts := range t.minTS {
|
||||
ts.advanceTimeWithFill(now, math.MaxInt64)
|
||||
}
|
||||
for _, ts := range t.maxTS {
|
||||
ts.advanceTimeWithFill(now, math.MinInt64)
|
||||
}
|
||||
return now
|
||||
}
|
||||
|
||||
// LastUpdate returns the last update time of the range.
|
||||
func (t *Tracker) LastUpdate() time.Time {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
return t.lastUpdate
|
||||
}
|
||||
|
||||
// Push adds a new value if it is a new minimum or maximum.
|
||||
func (t *Tracker) Push(value int64) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.lastUpdate = t.advance()
|
||||
if t.min > value {
|
||||
t.min = value
|
||||
}
|
||||
if t.max < value {
|
||||
t.max = value
|
||||
}
|
||||
for _, ts := range t.minTS {
|
||||
if ts.headValue() > value {
|
||||
ts.set(value)
|
||||
}
|
||||
}
|
||||
for _, ts := range t.maxTS {
|
||||
if ts.headValue() < value {
|
||||
ts.set(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Min returns the minimum value of the tracker
|
||||
func (t *Tracker) Min() int64 {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
return t.min
|
||||
}
|
||||
|
||||
// Max returns the maximum value of the tracker.
|
||||
func (t *Tracker) Max() int64 {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
return t.max
|
||||
}
|
||||
|
||||
// Min1h returns the minimum value for the last hour.
|
||||
func (t *Tracker) Min1h() int64 {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.advance()
|
||||
return t.minTS[hour].min()
|
||||
}
|
||||
|
||||
// Max1h returns the maximum value for the last hour.
|
||||
func (t *Tracker) Max1h() int64 {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.advance()
|
||||
return t.maxTS[hour].max()
|
||||
}
|
||||
|
||||
// Min10m returns the minimum value for the last 10 minutes.
|
||||
func (t *Tracker) Min10m() int64 {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.advance()
|
||||
return t.minTS[tenminutes].min()
|
||||
}
|
||||
|
||||
// Max10m returns the maximum value for the last 10 minutes.
|
||||
func (t *Tracker) Max10m() int64 {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.advance()
|
||||
return t.maxTS[tenminutes].max()
|
||||
}
|
||||
|
||||
// Min1m returns the minimum value for the last 1 minute.
|
||||
func (t *Tracker) Min1m() int64 {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.advance()
|
||||
return t.minTS[minute].min()
|
||||
}
|
||||
|
||||
// Max1m returns the maximum value for the last 1 minute.
|
||||
func (t *Tracker) Max1m() int64 {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.advance()
|
||||
return t.maxTS[minute].max()
|
||||
}
|
||||
|
||||
// Reset resets the range to an empty state.
|
||||
func (t *Tracker) Reset() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
now := TimeNow()
|
||||
for _, ts := range t.minTS {
|
||||
ts.reset(now)
|
||||
}
|
||||
for _, ts := range t.maxTS {
|
||||
ts.reset(now)
|
||||
}
|
||||
t.init()
|
||||
}
|
@ -53,12 +53,38 @@ var (
|
||||
caFile = "benchmark/server/testdata/ca.pem"
|
||||
)
|
||||
|
||||
type lockingHistogram struct {
|
||||
mu sync.Mutex
|
||||
histogram *stats.Histogram
|
||||
}
|
||||
|
||||
func (h *lockingHistogram) add(value int64) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.histogram.Add(value)
|
||||
}
|
||||
|
||||
// swap sets h.histogram to new, and returns its old value.
|
||||
func (h *lockingHistogram) swap(new *stats.Histogram) *stats.Histogram {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
old := h.histogram
|
||||
h.histogram = new
|
||||
return old
|
||||
}
|
||||
|
||||
func (h *lockingHistogram) mergeInto(merged *stats.Histogram) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
merged.Merge(h.histogram)
|
||||
}
|
||||
|
||||
type benchmarkClient struct {
|
||||
closeConns func()
|
||||
stop chan bool
|
||||
mu sync.RWMutex
|
||||
lastResetTime time.Time
|
||||
histogram *stats.Histogram
|
||||
closeConns func()
|
||||
stop chan bool
|
||||
lastResetTime time.Time
|
||||
histogramOptions stats.HistogramOptions
|
||||
lockingHistograms []lockingHistogram
|
||||
}
|
||||
|
||||
func printClientConfig(config *testpb.ClientConfig) {
|
||||
@ -131,7 +157,7 @@ func createConns(config *testpb.ClientConfig) ([]*grpc.ClientConn, func(), error
|
||||
|
||||
// Create connections.
|
||||
connCount := int(config.ClientChannels)
|
||||
conns := make([]*grpc.ClientConn, connCount)
|
||||
conns := make([]*grpc.ClientConn, connCount, connCount)
|
||||
for connIndex := 0; connIndex < connCount; connIndex++ {
|
||||
conns[connIndex] = benchmark.NewClientConn(config.ServerTargets[connIndex%len(config.ServerTargets)], opts...)
|
||||
}
|
||||
@ -200,13 +226,16 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rpcCountPerConn := int(config.OutstandingRpcsPerChannel)
|
||||
bc := &benchmarkClient{
|
||||
histogram: stats.NewHistogram(stats.HistogramOptions{
|
||||
histogramOptions: stats.HistogramOptions{
|
||||
NumBuckets: int(math.Log(config.HistogramParams.MaxPossible)/math.Log(1+config.HistogramParams.Resolution)) + 1,
|
||||
GrowthFactor: config.HistogramParams.Resolution,
|
||||
BaseBucketSize: (1 + config.HistogramParams.Resolution),
|
||||
MinValue: 0,
|
||||
}),
|
||||
},
|
||||
lockingHistograms: make([]lockingHistogram, rpcCountPerConn*len(conns), rpcCountPerConn*len(conns)),
|
||||
|
||||
stop: make(chan bool),
|
||||
lastResetTime: time.Now(),
|
||||
closeConns: closeConns,
|
||||
@ -222,11 +251,15 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
|
||||
}
|
||||
|
||||
func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int) {
|
||||
for _, conn := range conns {
|
||||
for ic, conn := range conns {
|
||||
client := testpb.NewBenchmarkServiceClient(conn)
|
||||
// For each connection, create rpcCountPerConn goroutines to do rpc.
|
||||
for j := 0; j < rpcCountPerConn; j++ {
|
||||
go func() {
|
||||
// Create histogram for each goroutine.
|
||||
idx := ic*rpcCountPerConn + j
|
||||
bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions)
|
||||
// Start goroutine on the created mutex and histogram.
|
||||
go func(idx int) {
|
||||
// TODO: do warm up if necessary.
|
||||
// Now relying on worker client to reserve time to do warm up.
|
||||
// The worker client needs to wait for some time after client is created,
|
||||
@ -243,9 +276,7 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe
|
||||
return
|
||||
}
|
||||
elapse := time.Since(start)
|
||||
bc.mu.Lock()
|
||||
bc.histogram.Add(int64(elapse))
|
||||
bc.mu.Unlock()
|
||||
bc.lockingHistograms[idx].add(int64(elapse))
|
||||
select {
|
||||
case <-bc.stop:
|
||||
case done <- true:
|
||||
@ -257,7 +288,7 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe
|
||||
case <-done:
|
||||
}
|
||||
}
|
||||
}()
|
||||
}(idx)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -269,7 +300,7 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou
|
||||
} else {
|
||||
doRPC = benchmark.DoStreamingRoundTrip
|
||||
}
|
||||
for _, conn := range conns {
|
||||
for ic, conn := range conns {
|
||||
// For each connection, create rpcCountPerConn goroutines to do rpc.
|
||||
for j := 0; j < rpcCountPerConn; j++ {
|
||||
c := testpb.NewBenchmarkServiceClient(conn)
|
||||
@ -277,8 +308,11 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou
|
||||
if err != nil {
|
||||
grpclog.Fatalf("%v.StreamingCall(_) = _, %v", c, err)
|
||||
}
|
||||
// Create benchmark rpc goroutine.
|
||||
go func() {
|
||||
// Create histogram for each goroutine.
|
||||
idx := ic*rpcCountPerConn + j
|
||||
bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions)
|
||||
// Start goroutine on the created mutex and histogram.
|
||||
go func(idx int) {
|
||||
// TODO: do warm up if necessary.
|
||||
// Now relying on worker client to reserve time to do warm up.
|
||||
// The worker client needs to wait for some time after client is created,
|
||||
@ -295,9 +329,7 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou
|
||||
return
|
||||
}
|
||||
elapse := time.Since(start)
|
||||
bc.mu.Lock()
|
||||
bc.histogram.Add(int64(elapse))
|
||||
bc.mu.Unlock()
|
||||
bc.lockingHistograms[idx].add(int64(elapse))
|
||||
select {
|
||||
case <-bc.stop:
|
||||
case done <- true:
|
||||
@ -309,29 +341,51 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou
|
||||
case <-done:
|
||||
}
|
||||
}
|
||||
}()
|
||||
}(idx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (bc *benchmarkClient) getStats() *testpb.ClientStats {
|
||||
bc.mu.RLock()
|
||||
defer bc.mu.RUnlock()
|
||||
timeElapsed := time.Since(bc.lastResetTime).Seconds()
|
||||
// getStats returns the stats for benchmark client.
|
||||
// It resets lastResetTime and all histograms if argument reset is true.
|
||||
func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats {
|
||||
var timeElapsed float64
|
||||
mergedHistogram := stats.NewHistogram(bc.histogramOptions)
|
||||
|
||||
histogramValue := bc.histogram.Value()
|
||||
b := make([]uint32, len(histogramValue.Buckets))
|
||||
for i, v := range histogramValue.Buckets {
|
||||
if reset {
|
||||
// Merging histogram may take some time.
|
||||
// Put all histograms aside and merge later.
|
||||
toMerge := make([]*stats.Histogram, len(bc.lockingHistograms), len(bc.lockingHistograms))
|
||||
for i := range bc.lockingHistograms {
|
||||
toMerge[i] = bc.lockingHistograms[i].swap(stats.NewHistogram(bc.histogramOptions))
|
||||
}
|
||||
|
||||
for i := 0; i < len(toMerge); i++ {
|
||||
mergedHistogram.Merge(toMerge[i])
|
||||
}
|
||||
|
||||
timeElapsed = time.Since(bc.lastResetTime).Seconds()
|
||||
bc.lastResetTime = time.Now()
|
||||
} else {
|
||||
// Merge only, not reset.
|
||||
for i := range bc.lockingHistograms {
|
||||
bc.lockingHistograms[i].mergeInto(mergedHistogram)
|
||||
}
|
||||
timeElapsed = time.Since(bc.lastResetTime).Seconds()
|
||||
}
|
||||
|
||||
b := make([]uint32, len(mergedHistogram.Buckets), len(mergedHistogram.Buckets))
|
||||
for i, v := range mergedHistogram.Buckets {
|
||||
b[i] = uint32(v.Count)
|
||||
}
|
||||
return &testpb.ClientStats{
|
||||
Latencies: &testpb.HistogramData{
|
||||
Bucket: b,
|
||||
MinSeen: float64(histogramValue.Min),
|
||||
MaxSeen: float64(histogramValue.Max),
|
||||
Sum: float64(histogramValue.Sum),
|
||||
SumOfSquares: float64(histogramValue.SumOfSquares),
|
||||
Count: float64(histogramValue.Count),
|
||||
MinSeen: float64(mergedHistogram.Min),
|
||||
MaxSeen: float64(mergedHistogram.Max),
|
||||
Sum: float64(mergedHistogram.Sum),
|
||||
SumOfSquares: float64(mergedHistogram.SumOfSquares),
|
||||
Count: float64(mergedHistogram.Count),
|
||||
},
|
||||
TimeElapsed: timeElapsed,
|
||||
TimeUser: 0,
|
||||
@ -339,15 +393,6 @@ func (bc *benchmarkClient) getStats() *testpb.ClientStats {
|
||||
}
|
||||
}
|
||||
|
||||
// reset clears the contents for histogram and set lastResetTime to Now().
|
||||
// It is called to get ready for benchmark runs.
|
||||
func (bc *benchmarkClient) reset() {
|
||||
bc.mu.Lock()
|
||||
defer bc.mu.Unlock()
|
||||
bc.lastResetTime = time.Now()
|
||||
bc.histogram.Clear()
|
||||
}
|
||||
|
||||
func (bc *benchmarkClient) shutdown() {
|
||||
close(bc.stop)
|
||||
bc.closeConns()
|
||||
|
@ -159,15 +159,15 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma
|
||||
return &benchmarkServer{port: p, cores: numOfCores, closeFunc: closeFunc, lastResetTime: time.Now()}, nil
|
||||
}
|
||||
|
||||
func (bs *benchmarkServer) getStats() *testpb.ServerStats {
|
||||
// getStats returns the stats for benchmark server.
|
||||
// It resets lastResetTime if argument reset is true.
|
||||
func (bs *benchmarkServer) getStats(reset bool) *testpb.ServerStats {
|
||||
// TODO wall time, sys time, user time.
|
||||
bs.mu.RLock()
|
||||
defer bs.mu.RUnlock()
|
||||
return &testpb.ServerStats{TimeElapsed: time.Since(bs.lastResetTime).Seconds(), TimeUser: 0, TimeSystem: 0}
|
||||
}
|
||||
|
||||
func (bs *benchmarkServer) reset() {
|
||||
bs.mu.Lock()
|
||||
defer bs.mu.Unlock()
|
||||
bs.lastResetTime = time.Now()
|
||||
timeElapsed := time.Since(bs.lastResetTime).Seconds()
|
||||
if reset {
|
||||
bs.lastResetTime = time.Now()
|
||||
}
|
||||
return &testpb.ServerStats{TimeElapsed: timeElapsed, TimeUser: 0, TimeSystem: 0}
|
||||
}
|
||||
|
@ -116,7 +116,7 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er
|
||||
return err
|
||||
}
|
||||
out = &testpb.ServerStatus{
|
||||
Stats: bs.getStats(),
|
||||
Stats: bs.getStats(false),
|
||||
Port: int32(bs.port),
|
||||
Cores: int32(bs.cores),
|
||||
}
|
||||
@ -128,13 +128,10 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er
|
||||
return grpc.Errorf(codes.InvalidArgument, "server does not exist when mark received")
|
||||
}
|
||||
out = &testpb.ServerStatus{
|
||||
Stats: bs.getStats(),
|
||||
Stats: bs.getStats(argtype.Mark.Reset_),
|
||||
Port: int32(bs.port),
|
||||
Cores: int32(bs.cores),
|
||||
}
|
||||
if argtype.Mark.Reset_ {
|
||||
bs.reset()
|
||||
}
|
||||
}
|
||||
|
||||
if err := stream.Send(out); err != nil {
|
||||
@ -176,7 +173,7 @@ func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) er
|
||||
return err
|
||||
}
|
||||
out = &testpb.ClientStatus{
|
||||
Stats: bc.getStats(),
|
||||
Stats: bc.getStats(false),
|
||||
}
|
||||
|
||||
case *testpb.ClientArgs_Mark:
|
||||
@ -186,10 +183,7 @@ func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) er
|
||||
return grpc.Errorf(codes.InvalidArgument, "client does not exist when mark received")
|
||||
}
|
||||
out = &testpb.ClientStatus{
|
||||
Stats: bc.getStats(),
|
||||
}
|
||||
if t.Mark.Reset_ {
|
||||
bc.reset()
|
||||
Stats: bc.getStats(t.Mark.Reset_),
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user