diff --git a/benchmark/stats/counter.go b/benchmark/stats/counter.go deleted file mode 100644 index 4389bae3..00000000 --- a/benchmark/stats/counter.go +++ /dev/null @@ -1,135 +0,0 @@ -package stats - -import ( - "sync" - "time" -) - -var ( - // TimeNow is used for testing. - TimeNow = time.Now -) - -const ( - hour = 0 - tenminutes = 1 - minute = 2 -) - -// Counter is a counter that keeps track of its recent values over a given -// period of time, and with a given resolution. Use newCounter() to instantiate. -type Counter struct { - mu sync.RWMutex - ts [3]*timeseries - lastUpdate time.Time -} - -// newCounter returns a new Counter. -func newCounter() *Counter { - now := TimeNow() - c := &Counter{} - c.ts[hour] = newTimeSeries(now, time.Hour, time.Minute) - c.ts[tenminutes] = newTimeSeries(now, 10*time.Minute, 10*time.Second) - c.ts[minute] = newTimeSeries(now, time.Minute, time.Second) - return c -} - -func (c *Counter) advance() time.Time { - now := TimeNow() - for _, ts := range c.ts { - ts.advanceTime(now) - } - return now -} - -// Value returns the current value of the counter. -func (c *Counter) Value() int64 { - c.mu.RLock() - defer c.mu.RUnlock() - return c.ts[minute].headValue() -} - -// LastUpdate returns the last update time of the counter. -func (c *Counter) LastUpdate() time.Time { - c.mu.RLock() - defer c.mu.RUnlock() - return c.lastUpdate -} - -// Set updates the current value of the counter. -func (c *Counter) Set(value int64) { - c.mu.Lock() - defer c.mu.Unlock() - c.lastUpdate = c.advance() - for _, ts := range c.ts { - ts.set(value) - } -} - -// Incr increments the current value of the counter by 'delta'. -func (c *Counter) Incr(delta int64) { - c.mu.Lock() - defer c.mu.Unlock() - c.lastUpdate = c.advance() - for _, ts := range c.ts { - ts.incr(delta) - } -} - -// Delta1h returns the delta for the last hour. -func (c *Counter) Delta1h() int64 { - c.mu.RLock() - defer c.mu.RUnlock() - c.advance() - return c.ts[hour].delta() -} - -// Delta10m returns the delta for the last 10 minutes. -func (c *Counter) Delta10m() int64 { - c.mu.RLock() - defer c.mu.RUnlock() - c.advance() - return c.ts[tenminutes].delta() -} - -// Delta1m returns the delta for the last minute. -func (c *Counter) Delta1m() int64 { - c.mu.RLock() - defer c.mu.RUnlock() - c.advance() - return c.ts[minute].delta() -} - -// Rate1h returns the rate of change of the counter in the last hour. -func (c *Counter) Rate1h() float64 { - c.mu.RLock() - defer c.mu.RUnlock() - c.advance() - return c.ts[hour].rate() -} - -// Rate10m returns the rate of change of the counter in the last 10 minutes. -func (c *Counter) Rate10m() float64 { - c.mu.RLock() - defer c.mu.RUnlock() - c.advance() - return c.ts[tenminutes].rate() -} - -// Rate1m returns the rate of change of the counter in the last minute. -func (c *Counter) Rate1m() float64 { - c.mu.RLock() - defer c.mu.RUnlock() - c.advance() - return c.ts[minute].rate() -} - -// Reset resets the counter to an empty state. -func (c *Counter) Reset() { - c.mu.Lock() - defer c.mu.Unlock() - now := TimeNow() - for _, ts := range c.ts { - ts.reset(now) - } -} diff --git a/benchmark/stats/histogram.go b/benchmark/stats/histogram.go index 36dbc4fa..099bcd65 100644 --- a/benchmark/stats/histogram.go +++ b/benchmark/stats/histogram.go @@ -4,14 +4,15 @@ import ( "bytes" "fmt" "io" + "log" "math" "strconv" "strings" - "time" ) -// HistogramValue is the value of Histogram objects. -type HistogramValue struct { +// Histogram accumulates values in the form of a histogram with +// exponentially increased bucket sizes. +type Histogram struct { // Count is the total number of values added to the histogram. Count int64 // Sum is the sum of all the values added to the histogram. @@ -24,9 +25,30 @@ type HistogramValue struct { Max int64 // Buckets contains all the buckets of the histogram. Buckets []HistogramBucket + + opts HistogramOptions + logBaseBucketSize float64 + oneOverLogOnePlusGrowthFactor float64 } -// HistogramBucket is one histogram bucket. +// HistogramOptions contains the parameters that define the histogram's buckets. +// The first bucket of the created histogram (with index 0) contains [min, min+n) +// where n = BaseBucketSize, min = MinValue. +// Bucket i (i>=1) contains [min + n * m^(i-1), min + n * m^i), where m = 1+GrowthFactor. +// The type of the values is int64. +type HistogramOptions struct { + // NumBuckets is the number of buckets. + NumBuckets int + // GrowthFactor is the growth factor of the buckets. A value of 0.1 + // indicates that bucket N+1 will be 10% larger than bucket N. + GrowthFactor float64 + // BaseBucketSize is the size of the first bucket. + BaseBucketSize float64 + // MinValue is the lower bound of the first bucket. + MinValue int64 +} + +// HistogramBucket represents one histogram bucket. type HistogramBucket struct { // LowBound is the lower bound of the bucket. LowBound float64 @@ -34,28 +56,56 @@ type HistogramBucket struct { Count int64 } +// NewHistogram returns a pointer to a new Histogram object that was created +// with the provided options. +func NewHistogram(opts HistogramOptions) *Histogram { + if opts.NumBuckets == 0 { + opts.NumBuckets = 32 + } + if opts.BaseBucketSize == 0.0 { + opts.BaseBucketSize = 1.0 + } + h := Histogram{ + Buckets: make([]HistogramBucket, opts.NumBuckets), + Min: math.MaxInt64, + Max: math.MinInt64, + + opts: opts, + logBaseBucketSize: math.Log(opts.BaseBucketSize), + oneOverLogOnePlusGrowthFactor: 1 / math.Log(1+opts.GrowthFactor), + } + m := 1.0 + opts.GrowthFactor + delta := opts.BaseBucketSize + h.Buckets[0].LowBound = float64(opts.MinValue) + for i := 1; i < opts.NumBuckets; i++ { + h.Buckets[i].LowBound = float64(opts.MinValue) + delta + delta = delta * m + } + return &h +} + // Print writes textual output of the histogram values. -func (v HistogramValue) Print(w io.Writer) { - avg := float64(v.Sum) / float64(v.Count) - fmt.Fprintf(w, "Count: %d Min: %d Max: %d Avg: %.2f\n", v.Count, v.Min, v.Max, avg) +func (h *Histogram) Print(w io.Writer) { + avg := float64(h.Sum) / float64(h.Count) + fmt.Fprintf(w, "Count: %d Min: %d Max: %d Avg: %.2f\n", h.Count, h.Min, h.Max, avg) fmt.Fprintf(w, "%s\n", strings.Repeat("-", 60)) - if v.Count <= 0 { + if h.Count <= 0 { return } - maxBucketDigitLen := len(strconv.FormatFloat(v.Buckets[len(v.Buckets)-1].LowBound, 'f', 6, 64)) + maxBucketDigitLen := len(strconv.FormatFloat(h.Buckets[len(h.Buckets)-1].LowBound, 'f', 6, 64)) if maxBucketDigitLen < 3 { // For "inf". maxBucketDigitLen = 3 } - maxCountDigitLen := len(strconv.FormatInt(v.Count, 10)) - percentMulti := 100 / float64(v.Count) + maxCountDigitLen := len(strconv.FormatInt(h.Count, 10)) + percentMulti := 100 / float64(h.Count) accCount := int64(0) - for i, b := range v.Buckets { + for i, b := range h.Buckets { fmt.Fprintf(w, "[%*f, ", maxBucketDigitLen, b.LowBound) - if i+1 < len(v.Buckets) { - fmt.Fprintf(w, "%*f)", maxBucketDigitLen, v.Buckets[i+1].LowBound) + if i+1 < len(h.Buckets) { + fmt.Fprintf(w, "%*f)", maxBucketDigitLen, h.Buckets[i+1].LowBound) } else { fmt.Fprintf(w, "%*s)", maxBucketDigitLen, "inf") } @@ -70,89 +120,21 @@ func (v HistogramValue) Print(w io.Writer) { } // String returns the textual output of the histogram values as string. -func (v HistogramValue) String() string { +func (h *Histogram) String() string { var b bytes.Buffer - v.Print(&b) + h.Print(&b) return b.String() } -// Histogram accumulates values in the form of a histogram with -// exponentially increased bucket sizes. -// The first bucket (with index 0) is [0, n) where n = baseBucketSize. -// Bucket i (i>=1) contains [n * m^(i-1), n * m^i), where m = 1 + GrowthFactor. -// The type of the values is int64. -type Histogram struct { - opts HistogramOptions - buckets []bucketInternal - count *Counter - sum *Counter - sumOfSquares *Counter - tracker *Tracker - - logBaseBucketSize float64 - oneOverLogOnePlusGrowthFactor float64 -} - -// HistogramOptions contains the parameters that define the histogram's buckets. -type HistogramOptions struct { - // NumBuckets is the number of buckets. - NumBuckets int - // GrowthFactor is the growth factor of the buckets. A value of 0.1 - // indicates that bucket N+1 will be 10% larger than bucket N. - GrowthFactor float64 - // BaseBucketSize is the size of the first bucket. - BaseBucketSize float64 - // MinValue is the lower bound of the first bucket. - MinValue int64 -} - -// bucketInternal is the internal representation of a bucket, which includes a -// rate counter. -type bucketInternal struct { - lowBound float64 - count *Counter -} - -// NewHistogram returns a pointer to a new Histogram object that was created -// with the provided options. -func NewHistogram(opts HistogramOptions) *Histogram { - if opts.NumBuckets == 0 { - opts.NumBuckets = 32 - } - if opts.BaseBucketSize == 0.0 { - opts.BaseBucketSize = 1.0 - } - h := Histogram{ - opts: opts, - buckets: make([]bucketInternal, opts.NumBuckets), - count: newCounter(), - sum: newCounter(), - sumOfSquares: newCounter(), - tracker: newTracker(), - - logBaseBucketSize: math.Log(opts.BaseBucketSize), - oneOverLogOnePlusGrowthFactor: 1 / math.Log(1+opts.GrowthFactor), - } - m := 1.0 + opts.GrowthFactor - delta := opts.BaseBucketSize - h.buckets[0].lowBound = float64(opts.MinValue) - h.buckets[0].count = newCounter() - for i := 1; i < opts.NumBuckets; i++ { - h.buckets[i].lowBound = float64(opts.MinValue) + delta - h.buckets[i].count = newCounter() - delta = delta * m - } - return &h -} - // Clear resets all the content of histogram. func (h *Histogram) Clear() { - h.count = newCounter() - h.sum = newCounter() - h.sumOfSquares = newCounter() - h.tracker = newTracker() - for _, v := range h.buckets { - v.count = newCounter() + h.Count = 0 + h.Sum = 0 + h.SumOfSquares = 0 + h.Min = math.MaxInt64 + h.Max = math.MinInt64 + for _, v := range h.Buckets { + v.Count = 0 } } @@ -167,103 +149,19 @@ func (h *Histogram) Add(value int64) error { if err != nil { return err } - h.buckets[bucket].count.Incr(1) - h.count.Incr(1) - h.sum.Incr(value) - h.sumOfSquares.Incr(value * value) - h.tracker.Push(value) + h.Buckets[bucket].Count++ + h.Count++ + h.Sum += value + h.SumOfSquares += value * value + if value < h.Min { + h.Min = value + } + if value > h.Max { + h.Max = value + } return nil } -// LastUpdate returns the time at which the object was last updated. -func (h *Histogram) LastUpdate() time.Time { - return h.count.LastUpdate() -} - -// Value returns the accumulated state of the histogram since it was created. -func (h *Histogram) Value() HistogramValue { - b := make([]HistogramBucket, len(h.buckets)) - for i, v := range h.buckets { - b[i] = HistogramBucket{ - LowBound: v.lowBound, - Count: v.count.Value(), - } - } - - v := HistogramValue{ - Count: h.count.Value(), - Sum: h.sum.Value(), - SumOfSquares: h.sumOfSquares.Value(), - Min: h.tracker.Min(), - Max: h.tracker.Max(), - Buckets: b, - } - return v -} - -// Delta1h returns the change in the last hour. -func (h *Histogram) Delta1h() HistogramValue { - b := make([]HistogramBucket, len(h.buckets)) - for i, v := range h.buckets { - b[i] = HistogramBucket{ - LowBound: v.lowBound, - Count: v.count.Delta1h(), - } - } - - v := HistogramValue{ - Count: h.count.Delta1h(), - Sum: h.sum.Delta1h(), - SumOfSquares: h.sumOfSquares.Delta1h(), - Min: h.tracker.Min1h(), - Max: h.tracker.Max1h(), - Buckets: b, - } - return v -} - -// Delta10m returns the change in the last 10 minutes. -func (h *Histogram) Delta10m() HistogramValue { - b := make([]HistogramBucket, len(h.buckets)) - for i, v := range h.buckets { - b[i] = HistogramBucket{ - LowBound: v.lowBound, - Count: v.count.Delta10m(), - } - } - - v := HistogramValue{ - Count: h.count.Delta10m(), - Sum: h.sum.Delta10m(), - SumOfSquares: h.sumOfSquares.Delta10m(), - Min: h.tracker.Min10m(), - Max: h.tracker.Max10m(), - Buckets: b, - } - return v -} - -// Delta1m returns the change in the last 10 minutes. -func (h *Histogram) Delta1m() HistogramValue { - b := make([]HistogramBucket, len(h.buckets)) - for i, v := range h.buckets { - b[i] = HistogramBucket{ - LowBound: v.lowBound, - Count: v.count.Delta1m(), - } - } - - v := HistogramValue{ - Count: h.count.Delta1m(), - Sum: h.sum.Delta1m(), - SumOfSquares: h.sumOfSquares.Delta1m(), - Min: h.tracker.Min1m(), - Max: h.tracker.Max1m(), - Buckets: b, - } - return v -} - func (h *Histogram) findBucket(value int64) (int, error) { delta := float64(value - h.opts.MinValue) var b int @@ -273,8 +171,28 @@ func (h *Histogram) findBucket(value int64) (int, error) { // = (log(delta) - log(baseBucketSize)) * (1 / log(1+growthFactor)) + 1 b = int((math.Log(delta)-h.logBaseBucketSize)*h.oneOverLogOnePlusGrowthFactor + 1) } - if b >= len(h.buckets) { + if b >= len(h.Buckets) { return 0, fmt.Errorf("no bucket for value: %d", value) } return b, nil } + +// Merge takes another histogram h2, and merges its content into h. +// The two histograms must be created by equivalent HistogramOptions. +func (h *Histogram) Merge(h2 *Histogram) { + if h.opts != h2.opts { + log.Fatalf("failed to merge histograms, created by inequivalent options") + } + h.Count += h2.Count + h.Sum += h2.Sum + h.SumOfSquares += h2.SumOfSquares + if h2.Min < h.Min { + h.Min = h2.Min + } + if h2.Max > h.Max { + h.Max = h2.Max + } + for i, b := range h2.Buckets { + h.Buckets[i].Count += b.Count + } +} diff --git a/benchmark/stats/stats.go b/benchmark/stats/stats.go index eef620d4..e0edb174 100644 --- a/benchmark/stats/stats.go +++ b/benchmark/stats/stats.go @@ -104,7 +104,7 @@ func (stats *Stats) Print(w io.Writer) { fmt.Fprint(w, "Histogram (empty)\n") } else { fmt.Fprintf(w, "Histogram (unit: %s)\n", fmt.Sprintf("%v", stats.unit)[1:]) - stats.histogram.Value().Print(w) + stats.histogram.Print(w) } } diff --git a/benchmark/stats/timeseries.go b/benchmark/stats/timeseries.go deleted file mode 100644 index 2ba18a4d..00000000 --- a/benchmark/stats/timeseries.go +++ /dev/null @@ -1,154 +0,0 @@ -package stats - -import ( - "math" - "time" -) - -// timeseries holds the history of a changing value over a predefined period of -// time. -type timeseries struct { - size int // The number of time slots. Equivalent to len(slots). - resolution time.Duration // The time resolution of each slot. - stepCount int64 // The number of intervals seen since creation. - head int // The position of the current time in slots. - time time.Time // The time at the beginning of the current time slot. - slots []int64 // A circular buffer of time slots. -} - -// newTimeSeries returns a newly allocated timeseries that covers the requested -// period with the given resolution. -func newTimeSeries(initialTime time.Time, period, resolution time.Duration) *timeseries { - size := int(period.Nanoseconds()/resolution.Nanoseconds()) + 1 - return ×eries{ - size: size, - resolution: resolution, - stepCount: 1, - time: initialTime, - slots: make([]int64, size), - } -} - -// advanceTimeWithFill moves the timeseries forward to time t and fills in any -// slots that get skipped in the process with the given value. Values older than -// the timeseries period are lost. -func (ts *timeseries) advanceTimeWithFill(t time.Time, value int64) { - advanceTo := t.Truncate(ts.resolution) - if !advanceTo.After(ts.time) { - // This is shortcut for the most common case of a busy counter - // where updates come in many times per ts.resolution. - ts.time = advanceTo - return - } - steps := int(advanceTo.Sub(ts.time).Nanoseconds() / ts.resolution.Nanoseconds()) - ts.stepCount += int64(steps) - if steps > ts.size { - steps = ts.size - } - for steps > 0 { - ts.head = (ts.head + 1) % ts.size - ts.slots[ts.head] = value - steps-- - } - ts.time = advanceTo -} - -// advanceTime moves the timeseries forward to time t and fills in any slots -// that get skipped in the process with the head value. Values older than the -// timeseries period are lost. -func (ts *timeseries) advanceTime(t time.Time) { - ts.advanceTimeWithFill(t, ts.slots[ts.head]) -} - -// set sets the current value of the timeseries. -func (ts *timeseries) set(value int64) { - ts.slots[ts.head] = value -} - -// incr sets the current value of the timeseries. -func (ts *timeseries) incr(delta int64) { - ts.slots[ts.head] += delta -} - -// headValue returns the latest value from the timeseries. -func (ts *timeseries) headValue() int64 { - return ts.slots[ts.head] -} - -// headTime returns the time of the latest value from the timeseries. -func (ts *timeseries) headTime() time.Time { - return ts.time -} - -// tailValue returns the oldest value from the timeseries. -func (ts *timeseries) tailValue() int64 { - if ts.stepCount < int64(ts.size) { - return 0 - } - return ts.slots[(ts.head+1)%ts.size] -} - -// tailTime returns the time of the oldest value from the timeseries. -func (ts *timeseries) tailTime() time.Time { - size := int64(ts.size) - if ts.stepCount < size { - size = ts.stepCount - } - return ts.time.Add(-time.Duration(size-1) * ts.resolution) -} - -// delta returns the difference between the newest and oldest values from the -// timeseries. -func (ts *timeseries) delta() int64 { - return ts.headValue() - ts.tailValue() -} - -// rate returns the rate of change between the oldest and newest values from -// the timeseries in units per second. -func (ts *timeseries) rate() float64 { - deltaTime := ts.headTime().Sub(ts.tailTime()).Seconds() - if deltaTime == 0 { - return 0 - } - return float64(ts.delta()) / deltaTime -} - -// min returns the smallest value from the timeseries. -func (ts *timeseries) min() int64 { - to := ts.size - if ts.stepCount < int64(ts.size) { - to = ts.head + 1 - } - tail := (ts.head + 1) % ts.size - min := int64(math.MaxInt64) - for b := 0; b < to; b++ { - if b != tail && ts.slots[b] < min { - min = ts.slots[b] - } - } - return min -} - -// max returns the largest value from the timeseries. -func (ts *timeseries) max() int64 { - to := ts.size - if ts.stepCount < int64(ts.size) { - to = ts.head + 1 - } - tail := (ts.head + 1) % ts.size - max := int64(math.MinInt64) - for b := 0; b < to; b++ { - if b != tail && ts.slots[b] > max { - max = ts.slots[b] - } - } - return max -} - -// reset resets the timeseries to an empty state. -func (ts *timeseries) reset(t time.Time) { - ts.head = 0 - ts.time = t - ts.stepCount = 1 - ts.slots = make([]int64, ts.size) -} diff --git a/benchmark/stats/tracker.go b/benchmark/stats/tracker.go deleted file mode 100644 index 802f7295..00000000 --- a/benchmark/stats/tracker.go +++ /dev/null @@ -1,159 +0,0 @@ -package stats - -import ( - "math" - "sync" - "time" -) - -// Tracker is a min/max value tracker that keeps track of its min/max values -// over a given period of time, and with a given resolution. The initial min -// and max values are math.MaxInt64 and math.MinInt64 respectively. -type Tracker struct { - mu sync.RWMutex - min, max int64 // All time min/max. - minTS, maxTS [3]*timeseries - lastUpdate time.Time -} - -// newTracker returns a new Tracker. -func newTracker() *Tracker { - now := TimeNow() - t := &Tracker{} - t.minTS[hour] = newTimeSeries(now, time.Hour, time.Minute) - t.minTS[tenminutes] = newTimeSeries(now, 10*time.Minute, 10*time.Second) - t.minTS[minute] = newTimeSeries(now, time.Minute, time.Second) - t.maxTS[hour] = newTimeSeries(now, time.Hour, time.Minute) - t.maxTS[tenminutes] = newTimeSeries(now, 10*time.Minute, 10*time.Second) - t.maxTS[minute] = newTimeSeries(now, time.Minute, time.Second) - t.init() - return t -} - -func (t *Tracker) init() { - t.min = math.MaxInt64 - t.max = math.MinInt64 - for _, ts := range t.minTS { - ts.set(math.MaxInt64) - } - for _, ts := range t.maxTS { - ts.set(math.MinInt64) - } -} - -func (t *Tracker) advance() time.Time { - now := TimeNow() - for _, ts := range t.minTS { - ts.advanceTimeWithFill(now, math.MaxInt64) - } - for _, ts := range t.maxTS { - ts.advanceTimeWithFill(now, math.MinInt64) - } - return now -} - -// LastUpdate returns the last update time of the range. -func (t *Tracker) LastUpdate() time.Time { - t.mu.RLock() - defer t.mu.RUnlock() - return t.lastUpdate -} - -// Push adds a new value if it is a new minimum or maximum. -func (t *Tracker) Push(value int64) { - t.mu.Lock() - defer t.mu.Unlock() - t.lastUpdate = t.advance() - if t.min > value { - t.min = value - } - if t.max < value { - t.max = value - } - for _, ts := range t.minTS { - if ts.headValue() > value { - ts.set(value) - } - } - for _, ts := range t.maxTS { - if ts.headValue() < value { - ts.set(value) - } - } -} - -// Min returns the minimum value of the tracker -func (t *Tracker) Min() int64 { - t.mu.RLock() - defer t.mu.RUnlock() - return t.min -} - -// Max returns the maximum value of the tracker. -func (t *Tracker) Max() int64 { - t.mu.RLock() - defer t.mu.RUnlock() - return t.max -} - -// Min1h returns the minimum value for the last hour. -func (t *Tracker) Min1h() int64 { - t.mu.Lock() - defer t.mu.Unlock() - t.advance() - return t.minTS[hour].min() -} - -// Max1h returns the maximum value for the last hour. -func (t *Tracker) Max1h() int64 { - t.mu.Lock() - defer t.mu.Unlock() - t.advance() - return t.maxTS[hour].max() -} - -// Min10m returns the minimum value for the last 10 minutes. -func (t *Tracker) Min10m() int64 { - t.mu.Lock() - defer t.mu.Unlock() - t.advance() - return t.minTS[tenminutes].min() -} - -// Max10m returns the maximum value for the last 10 minutes. -func (t *Tracker) Max10m() int64 { - t.mu.Lock() - defer t.mu.Unlock() - t.advance() - return t.maxTS[tenminutes].max() -} - -// Min1m returns the minimum value for the last 1 minute. -func (t *Tracker) Min1m() int64 { - t.mu.Lock() - defer t.mu.Unlock() - t.advance() - return t.minTS[minute].min() -} - -// Max1m returns the maximum value for the last 1 minute. -func (t *Tracker) Max1m() int64 { - t.mu.Lock() - defer t.mu.Unlock() - t.advance() - return t.maxTS[minute].max() -} - -// Reset resets the range to an empty state. -func (t *Tracker) Reset() { - t.mu.Lock() - defer t.mu.Unlock() - now := TimeNow() - for _, ts := range t.minTS { - ts.reset(now) - } - for _, ts := range t.maxTS { - ts.reset(now) - } - t.init() -} diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go index 0067d5eb..77e522f2 100644 --- a/benchmark/worker/benchmark_client.go +++ b/benchmark/worker/benchmark_client.go @@ -53,12 +53,38 @@ var ( caFile = "benchmark/server/testdata/ca.pem" ) +type lockingHistogram struct { + mu sync.Mutex + histogram *stats.Histogram +} + +func (h *lockingHistogram) add(value int64) { + h.mu.Lock() + defer h.mu.Unlock() + h.histogram.Add(value) +} + +// swap sets h.histogram to new, and returns its old value. +func (h *lockingHistogram) swap(new *stats.Histogram) *stats.Histogram { + h.mu.Lock() + defer h.mu.Unlock() + old := h.histogram + h.histogram = new + return old +} + +func (h *lockingHistogram) mergeInto(merged *stats.Histogram) { + h.mu.Lock() + defer h.mu.Unlock() + merged.Merge(h.histogram) +} + type benchmarkClient struct { - closeConns func() - stop chan bool - mu sync.RWMutex - lastResetTime time.Time - histogram *stats.Histogram + closeConns func() + stop chan bool + lastResetTime time.Time + histogramOptions stats.HistogramOptions + lockingHistograms []lockingHistogram } func printClientConfig(config *testpb.ClientConfig) { @@ -131,7 +157,7 @@ func createConns(config *testpb.ClientConfig) ([]*grpc.ClientConn, func(), error // Create connections. connCount := int(config.ClientChannels) - conns := make([]*grpc.ClientConn, connCount) + conns := make([]*grpc.ClientConn, connCount, connCount) for connIndex := 0; connIndex < connCount; connIndex++ { conns[connIndex] = benchmark.NewClientConn(config.ServerTargets[connIndex%len(config.ServerTargets)], opts...) } @@ -200,13 +226,16 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) return nil, err } + rpcCountPerConn := int(config.OutstandingRpcsPerChannel) bc := &benchmarkClient{ - histogram: stats.NewHistogram(stats.HistogramOptions{ + histogramOptions: stats.HistogramOptions{ NumBuckets: int(math.Log(config.HistogramParams.MaxPossible)/math.Log(1+config.HistogramParams.Resolution)) + 1, GrowthFactor: config.HistogramParams.Resolution, BaseBucketSize: (1 + config.HistogramParams.Resolution), MinValue: 0, - }), + }, + lockingHistograms: make([]lockingHistogram, rpcCountPerConn*len(conns), rpcCountPerConn*len(conns)), + stop: make(chan bool), lastResetTime: time.Now(), closeConns: closeConns, @@ -222,11 +251,15 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) } func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int) { - for _, conn := range conns { + for ic, conn := range conns { client := testpb.NewBenchmarkServiceClient(conn) // For each connection, create rpcCountPerConn goroutines to do rpc. for j := 0; j < rpcCountPerConn; j++ { - go func() { + // Create histogram for each goroutine. + idx := ic*rpcCountPerConn + j + bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions) + // Start goroutine on the created mutex and histogram. + go func(idx int) { // TODO: do warm up if necessary. // Now relying on worker client to reserve time to do warm up. // The worker client needs to wait for some time after client is created, @@ -243,9 +276,7 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe return } elapse := time.Since(start) - bc.mu.Lock() - bc.histogram.Add(int64(elapse)) - bc.mu.Unlock() + bc.lockingHistograms[idx].add(int64(elapse)) select { case <-bc.stop: case done <- true: @@ -257,7 +288,7 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe case <-done: } } - }() + }(idx) } } } @@ -269,7 +300,7 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou } else { doRPC = benchmark.DoStreamingRoundTrip } - for _, conn := range conns { + for ic, conn := range conns { // For each connection, create rpcCountPerConn goroutines to do rpc. for j := 0; j < rpcCountPerConn; j++ { c := testpb.NewBenchmarkServiceClient(conn) @@ -277,8 +308,11 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou if err != nil { grpclog.Fatalf("%v.StreamingCall(_) = _, %v", c, err) } - // Create benchmark rpc goroutine. - go func() { + // Create histogram for each goroutine. + idx := ic*rpcCountPerConn + j + bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions) + // Start goroutine on the created mutex and histogram. + go func(idx int) { // TODO: do warm up if necessary. // Now relying on worker client to reserve time to do warm up. // The worker client needs to wait for some time after client is created, @@ -295,9 +329,7 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou return } elapse := time.Since(start) - bc.mu.Lock() - bc.histogram.Add(int64(elapse)) - bc.mu.Unlock() + bc.lockingHistograms[idx].add(int64(elapse)) select { case <-bc.stop: case done <- true: @@ -309,29 +341,51 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou case <-done: } } - }() + }(idx) } } } -func (bc *benchmarkClient) getStats() *testpb.ClientStats { - bc.mu.RLock() - defer bc.mu.RUnlock() - timeElapsed := time.Since(bc.lastResetTime).Seconds() +// getStats returns the stats for benchmark client. +// It resets lastResetTime and all histograms if argument reset is true. +func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats { + var timeElapsed float64 + mergedHistogram := stats.NewHistogram(bc.histogramOptions) - histogramValue := bc.histogram.Value() - b := make([]uint32, len(histogramValue.Buckets)) - for i, v := range histogramValue.Buckets { + if reset { + // Merging histogram may take some time. + // Put all histograms aside and merge later. + toMerge := make([]*stats.Histogram, len(bc.lockingHistograms), len(bc.lockingHistograms)) + for i := range bc.lockingHistograms { + toMerge[i] = bc.lockingHistograms[i].swap(stats.NewHistogram(bc.histogramOptions)) + } + + for i := 0; i < len(toMerge); i++ { + mergedHistogram.Merge(toMerge[i]) + } + + timeElapsed = time.Since(bc.lastResetTime).Seconds() + bc.lastResetTime = time.Now() + } else { + // Merge only, not reset. + for i := range bc.lockingHistograms { + bc.lockingHistograms[i].mergeInto(mergedHistogram) + } + timeElapsed = time.Since(bc.lastResetTime).Seconds() + } + + b := make([]uint32, len(mergedHistogram.Buckets), len(mergedHistogram.Buckets)) + for i, v := range mergedHistogram.Buckets { b[i] = uint32(v.Count) } return &testpb.ClientStats{ Latencies: &testpb.HistogramData{ Bucket: b, - MinSeen: float64(histogramValue.Min), - MaxSeen: float64(histogramValue.Max), - Sum: float64(histogramValue.Sum), - SumOfSquares: float64(histogramValue.SumOfSquares), - Count: float64(histogramValue.Count), + MinSeen: float64(mergedHistogram.Min), + MaxSeen: float64(mergedHistogram.Max), + Sum: float64(mergedHistogram.Sum), + SumOfSquares: float64(mergedHistogram.SumOfSquares), + Count: float64(mergedHistogram.Count), }, TimeElapsed: timeElapsed, TimeUser: 0, @@ -339,15 +393,6 @@ func (bc *benchmarkClient) getStats() *testpb.ClientStats { } } -// reset clears the contents for histogram and set lastResetTime to Now(). -// It is called to get ready for benchmark runs. -func (bc *benchmarkClient) reset() { - bc.mu.Lock() - defer bc.mu.Unlock() - bc.lastResetTime = time.Now() - bc.histogram.Clear() -} - func (bc *benchmarkClient) shutdown() { close(bc.stop) bc.closeConns() diff --git a/benchmark/worker/benchmark_server.go b/benchmark/worker/benchmark_server.go index 0a928a11..667ef2c1 100644 --- a/benchmark/worker/benchmark_server.go +++ b/benchmark/worker/benchmark_server.go @@ -159,15 +159,15 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma return &benchmarkServer{port: p, cores: numOfCores, closeFunc: closeFunc, lastResetTime: time.Now()}, nil } -func (bs *benchmarkServer) getStats() *testpb.ServerStats { +// getStats returns the stats for benchmark server. +// It resets lastResetTime if argument reset is true. +func (bs *benchmarkServer) getStats(reset bool) *testpb.ServerStats { // TODO wall time, sys time, user time. bs.mu.RLock() defer bs.mu.RUnlock() - return &testpb.ServerStats{TimeElapsed: time.Since(bs.lastResetTime).Seconds(), TimeUser: 0, TimeSystem: 0} -} - -func (bs *benchmarkServer) reset() { - bs.mu.Lock() - defer bs.mu.Unlock() - bs.lastResetTime = time.Now() + timeElapsed := time.Since(bs.lastResetTime).Seconds() + if reset { + bs.lastResetTime = time.Now() + } + return &testpb.ServerStats{TimeElapsed: timeElapsed, TimeUser: 0, TimeSystem: 0} } diff --git a/benchmark/worker/main.go b/benchmark/worker/main.go index 236ca8df..8fc2160c 100644 --- a/benchmark/worker/main.go +++ b/benchmark/worker/main.go @@ -116,7 +116,7 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er return err } out = &testpb.ServerStatus{ - Stats: bs.getStats(), + Stats: bs.getStats(false), Port: int32(bs.port), Cores: int32(bs.cores), } @@ -128,13 +128,10 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er return grpc.Errorf(codes.InvalidArgument, "server does not exist when mark received") } out = &testpb.ServerStatus{ - Stats: bs.getStats(), + Stats: bs.getStats(argtype.Mark.Reset_), Port: int32(bs.port), Cores: int32(bs.cores), } - if argtype.Mark.Reset_ { - bs.reset() - } } if err := stream.Send(out); err != nil { @@ -176,7 +173,7 @@ func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) er return err } out = &testpb.ClientStatus{ - Stats: bc.getStats(), + Stats: bc.getStats(false), } case *testpb.ClientArgs_Mark: @@ -186,10 +183,7 @@ func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) er return grpc.Errorf(codes.InvalidArgument, "client does not exist when mark received") } out = &testpb.ClientStatus{ - Stats: bc.getStats(), - } - if t.Mark.Reset_ { - bc.reset() + Stats: bc.getStats(t.Mark.Reset_), } }