From d846ac94b3babb1127a8e9a3acdd182f67950271 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 4 May 2016 17:47:55 -0700 Subject: [PATCH 01/17] Simplify histogram --- benchmark/stats/counter.go | 135 ----------------------------- benchmark/stats/histogram.go | 131 +++++++--------------------- benchmark/stats/timeseries.go | 154 -------------------------------- benchmark/stats/tracker.go | 159 ---------------------------------- 4 files changed, 33 insertions(+), 546 deletions(-) delete mode 100644 benchmark/stats/counter.go delete mode 100644 benchmark/stats/timeseries.go delete mode 100644 benchmark/stats/tracker.go diff --git a/benchmark/stats/counter.go b/benchmark/stats/counter.go deleted file mode 100644 index 4389bae3..00000000 --- a/benchmark/stats/counter.go +++ /dev/null @@ -1,135 +0,0 @@ -package stats - -import ( - "sync" - "time" -) - -var ( - // TimeNow is used for testing. - TimeNow = time.Now -) - -const ( - hour = 0 - tenminutes = 1 - minute = 2 -) - -// Counter is a counter that keeps track of its recent values over a given -// period of time, and with a given resolution. Use newCounter() to instantiate. -type Counter struct { - mu sync.RWMutex - ts [3]*timeseries - lastUpdate time.Time -} - -// newCounter returns a new Counter. -func newCounter() *Counter { - now := TimeNow() - c := &Counter{} - c.ts[hour] = newTimeSeries(now, time.Hour, time.Minute) - c.ts[tenminutes] = newTimeSeries(now, 10*time.Minute, 10*time.Second) - c.ts[minute] = newTimeSeries(now, time.Minute, time.Second) - return c -} - -func (c *Counter) advance() time.Time { - now := TimeNow() - for _, ts := range c.ts { - ts.advanceTime(now) - } - return now -} - -// Value returns the current value of the counter. -func (c *Counter) Value() int64 { - c.mu.RLock() - defer c.mu.RUnlock() - return c.ts[minute].headValue() -} - -// LastUpdate returns the last update time of the counter. -func (c *Counter) LastUpdate() time.Time { - c.mu.RLock() - defer c.mu.RUnlock() - return c.lastUpdate -} - -// Set updates the current value of the counter. -func (c *Counter) Set(value int64) { - c.mu.Lock() - defer c.mu.Unlock() - c.lastUpdate = c.advance() - for _, ts := range c.ts { - ts.set(value) - } -} - -// Incr increments the current value of the counter by 'delta'. -func (c *Counter) Incr(delta int64) { - c.mu.Lock() - defer c.mu.Unlock() - c.lastUpdate = c.advance() - for _, ts := range c.ts { - ts.incr(delta) - } -} - -// Delta1h returns the delta for the last hour. -func (c *Counter) Delta1h() int64 { - c.mu.RLock() - defer c.mu.RUnlock() - c.advance() - return c.ts[hour].delta() -} - -// Delta10m returns the delta for the last 10 minutes. -func (c *Counter) Delta10m() int64 { - c.mu.RLock() - defer c.mu.RUnlock() - c.advance() - return c.ts[tenminutes].delta() -} - -// Delta1m returns the delta for the last minute. -func (c *Counter) Delta1m() int64 { - c.mu.RLock() - defer c.mu.RUnlock() - c.advance() - return c.ts[minute].delta() -} - -// Rate1h returns the rate of change of the counter in the last hour. -func (c *Counter) Rate1h() float64 { - c.mu.RLock() - defer c.mu.RUnlock() - c.advance() - return c.ts[hour].rate() -} - -// Rate10m returns the rate of change of the counter in the last 10 minutes. -func (c *Counter) Rate10m() float64 { - c.mu.RLock() - defer c.mu.RUnlock() - c.advance() - return c.ts[tenminutes].rate() -} - -// Rate1m returns the rate of change of the counter in the last minute. -func (c *Counter) Rate1m() float64 { - c.mu.RLock() - defer c.mu.RUnlock() - c.advance() - return c.ts[minute].rate() -} - -// Reset resets the counter to an empty state. -func (c *Counter) Reset() { - c.mu.Lock() - defer c.mu.Unlock() - now := TimeNow() - for _, ts := range c.ts { - ts.reset(now) - } -} diff --git a/benchmark/stats/histogram.go b/benchmark/stats/histogram.go index 36dbc4fa..fea014c2 100644 --- a/benchmark/stats/histogram.go +++ b/benchmark/stats/histogram.go @@ -7,7 +7,6 @@ import ( "math" "strconv" "strings" - "time" ) // HistogramValue is the value of Histogram objects. @@ -84,10 +83,11 @@ func (v HistogramValue) String() string { type Histogram struct { opts HistogramOptions buckets []bucketInternal - count *Counter - sum *Counter - sumOfSquares *Counter - tracker *Tracker + count int64 + sum int64 + sumOfSquares int64 + min int64 + max int64 logBaseBucketSize float64 oneOverLogOnePlusGrowthFactor float64 @@ -110,7 +110,7 @@ type HistogramOptions struct { // rate counter. type bucketInternal struct { lowBound float64 - count *Counter + count int64 } // NewHistogram returns a pointer to a new Histogram object that was created @@ -123,12 +123,10 @@ func NewHistogram(opts HistogramOptions) *Histogram { opts.BaseBucketSize = 1.0 } h := Histogram{ - opts: opts, - buckets: make([]bucketInternal, opts.NumBuckets), - count: newCounter(), - sum: newCounter(), - sumOfSquares: newCounter(), - tracker: newTracker(), + opts: opts, + buckets: make([]bucketInternal, opts.NumBuckets), + min: math.MaxInt64, + max: math.MinInt64, logBaseBucketSize: math.Log(opts.BaseBucketSize), oneOverLogOnePlusGrowthFactor: 1 / math.Log(1+opts.GrowthFactor), @@ -136,10 +134,8 @@ func NewHistogram(opts HistogramOptions) *Histogram { m := 1.0 + opts.GrowthFactor delta := opts.BaseBucketSize h.buckets[0].lowBound = float64(opts.MinValue) - h.buckets[0].count = newCounter() for i := 1; i < opts.NumBuckets; i++ { h.buckets[i].lowBound = float64(opts.MinValue) + delta - h.buckets[i].count = newCounter() delta = delta * m } return &h @@ -147,12 +143,14 @@ func NewHistogram(opts HistogramOptions) *Histogram { // Clear resets all the content of histogram. func (h *Histogram) Clear() { - h.count = newCounter() - h.sum = newCounter() - h.sumOfSquares = newCounter() - h.tracker = newTracker() + h.count = 0 + h.sum = 0 + h.sumOfSquares = 0 + h.max = 0 + h.min = math.MaxInt64 + h.max = math.MinInt64 for _, v := range h.buckets { - v.count = newCounter() + v.count = 0 } } @@ -167,98 +165,35 @@ func (h *Histogram) Add(value int64) error { if err != nil { return err } - h.buckets[bucket].count.Incr(1) - h.count.Incr(1) - h.sum.Incr(value) - h.sumOfSquares.Incr(value * value) - h.tracker.Push(value) + h.buckets[bucket].count++ + h.count++ + h.sum += value + h.sumOfSquares += value * value + if value < h.min { + h.min = value + } + if value > h.max { + h.max = value + } return nil } -// LastUpdate returns the time at which the object was last updated. -func (h *Histogram) LastUpdate() time.Time { - return h.count.LastUpdate() -} - // Value returns the accumulated state of the histogram since it was created. func (h *Histogram) Value() HistogramValue { b := make([]HistogramBucket, len(h.buckets)) for i, v := range h.buckets { b[i] = HistogramBucket{ LowBound: v.lowBound, - Count: v.count.Value(), + Count: v.count, } } v := HistogramValue{ - Count: h.count.Value(), - Sum: h.sum.Value(), - SumOfSquares: h.sumOfSquares.Value(), - Min: h.tracker.Min(), - Max: h.tracker.Max(), - Buckets: b, - } - return v -} - -// Delta1h returns the change in the last hour. -func (h *Histogram) Delta1h() HistogramValue { - b := make([]HistogramBucket, len(h.buckets)) - for i, v := range h.buckets { - b[i] = HistogramBucket{ - LowBound: v.lowBound, - Count: v.count.Delta1h(), - } - } - - v := HistogramValue{ - Count: h.count.Delta1h(), - Sum: h.sum.Delta1h(), - SumOfSquares: h.sumOfSquares.Delta1h(), - Min: h.tracker.Min1h(), - Max: h.tracker.Max1h(), - Buckets: b, - } - return v -} - -// Delta10m returns the change in the last 10 minutes. -func (h *Histogram) Delta10m() HistogramValue { - b := make([]HistogramBucket, len(h.buckets)) - for i, v := range h.buckets { - b[i] = HistogramBucket{ - LowBound: v.lowBound, - Count: v.count.Delta10m(), - } - } - - v := HistogramValue{ - Count: h.count.Delta10m(), - Sum: h.sum.Delta10m(), - SumOfSquares: h.sumOfSquares.Delta10m(), - Min: h.tracker.Min10m(), - Max: h.tracker.Max10m(), - Buckets: b, - } - return v -} - -// Delta1m returns the change in the last 10 minutes. -func (h *Histogram) Delta1m() HistogramValue { - b := make([]HistogramBucket, len(h.buckets)) - for i, v := range h.buckets { - b[i] = HistogramBucket{ - LowBound: v.lowBound, - Count: v.count.Delta1m(), - } - } - - v := HistogramValue{ - Count: h.count.Delta1m(), - Sum: h.sum.Delta1m(), - SumOfSquares: h.sumOfSquares.Delta1m(), - Min: h.tracker.Min1m(), - Max: h.tracker.Max1m(), + Count: h.count, + Sum: h.sum, + SumOfSquares: h.sumOfSquares, + Min: h.min, + Max: h.max, Buckets: b, } return v diff --git a/benchmark/stats/timeseries.go b/benchmark/stats/timeseries.go deleted file mode 100644 index 2ba18a4d..00000000 --- a/benchmark/stats/timeseries.go +++ /dev/null @@ -1,154 +0,0 @@ -package stats - -import ( - "math" - "time" -) - -// timeseries holds the history of a changing value over a predefined period of -// time. -type timeseries struct { - size int // The number of time slots. Equivalent to len(slots). - resolution time.Duration // The time resolution of each slot. - stepCount int64 // The number of intervals seen since creation. - head int // The position of the current time in slots. - time time.Time // The time at the beginning of the current time slot. - slots []int64 // A circular buffer of time slots. -} - -// newTimeSeries returns a newly allocated timeseries that covers the requested -// period with the given resolution. -func newTimeSeries(initialTime time.Time, period, resolution time.Duration) *timeseries { - size := int(period.Nanoseconds()/resolution.Nanoseconds()) + 1 - return ×eries{ - size: size, - resolution: resolution, - stepCount: 1, - time: initialTime, - slots: make([]int64, size), - } -} - -// advanceTimeWithFill moves the timeseries forward to time t and fills in any -// slots that get skipped in the process with the given value. Values older than -// the timeseries period are lost. -func (ts *timeseries) advanceTimeWithFill(t time.Time, value int64) { - advanceTo := t.Truncate(ts.resolution) - if !advanceTo.After(ts.time) { - // This is shortcut for the most common case of a busy counter - // where updates come in many times per ts.resolution. - ts.time = advanceTo - return - } - steps := int(advanceTo.Sub(ts.time).Nanoseconds() / ts.resolution.Nanoseconds()) - ts.stepCount += int64(steps) - if steps > ts.size { - steps = ts.size - } - for steps > 0 { - ts.head = (ts.head + 1) % ts.size - ts.slots[ts.head] = value - steps-- - } - ts.time = advanceTo -} - -// advanceTime moves the timeseries forward to time t and fills in any slots -// that get skipped in the process with the head value. Values older than the -// timeseries period are lost. -func (ts *timeseries) advanceTime(t time.Time) { - ts.advanceTimeWithFill(t, ts.slots[ts.head]) -} - -// set sets the current value of the timeseries. -func (ts *timeseries) set(value int64) { - ts.slots[ts.head] = value -} - -// incr sets the current value of the timeseries. -func (ts *timeseries) incr(delta int64) { - ts.slots[ts.head] += delta -} - -// headValue returns the latest value from the timeseries. -func (ts *timeseries) headValue() int64 { - return ts.slots[ts.head] -} - -// headTime returns the time of the latest value from the timeseries. -func (ts *timeseries) headTime() time.Time { - return ts.time -} - -// tailValue returns the oldest value from the timeseries. -func (ts *timeseries) tailValue() int64 { - if ts.stepCount < int64(ts.size) { - return 0 - } - return ts.slots[(ts.head+1)%ts.size] -} - -// tailTime returns the time of the oldest value from the timeseries. -func (ts *timeseries) tailTime() time.Time { - size := int64(ts.size) - if ts.stepCount < size { - size = ts.stepCount - } - return ts.time.Add(-time.Duration(size-1) * ts.resolution) -} - -// delta returns the difference between the newest and oldest values from the -// timeseries. -func (ts *timeseries) delta() int64 { - return ts.headValue() - ts.tailValue() -} - -// rate returns the rate of change between the oldest and newest values from -// the timeseries in units per second. -func (ts *timeseries) rate() float64 { - deltaTime := ts.headTime().Sub(ts.tailTime()).Seconds() - if deltaTime == 0 { - return 0 - } - return float64(ts.delta()) / deltaTime -} - -// min returns the smallest value from the timeseries. -func (ts *timeseries) min() int64 { - to := ts.size - if ts.stepCount < int64(ts.size) { - to = ts.head + 1 - } - tail := (ts.head + 1) % ts.size - min := int64(math.MaxInt64) - for b := 0; b < to; b++ { - if b != tail && ts.slots[b] < min { - min = ts.slots[b] - } - } - return min -} - -// max returns the largest value from the timeseries. -func (ts *timeseries) max() int64 { - to := ts.size - if ts.stepCount < int64(ts.size) { - to = ts.head + 1 - } - tail := (ts.head + 1) % ts.size - max := int64(math.MinInt64) - for b := 0; b < to; b++ { - if b != tail && ts.slots[b] > max { - max = ts.slots[b] - } - } - return max -} - -// reset resets the timeseries to an empty state. -func (ts *timeseries) reset(t time.Time) { - ts.head = 0 - ts.time = t - ts.stepCount = 1 - ts.slots = make([]int64, ts.size) -} diff --git a/benchmark/stats/tracker.go b/benchmark/stats/tracker.go deleted file mode 100644 index 802f7295..00000000 --- a/benchmark/stats/tracker.go +++ /dev/null @@ -1,159 +0,0 @@ -package stats - -import ( - "math" - "sync" - "time" -) - -// Tracker is a min/max value tracker that keeps track of its min/max values -// over a given period of time, and with a given resolution. The initial min -// and max values are math.MaxInt64 and math.MinInt64 respectively. -type Tracker struct { - mu sync.RWMutex - min, max int64 // All time min/max. - minTS, maxTS [3]*timeseries - lastUpdate time.Time -} - -// newTracker returns a new Tracker. -func newTracker() *Tracker { - now := TimeNow() - t := &Tracker{} - t.minTS[hour] = newTimeSeries(now, time.Hour, time.Minute) - t.minTS[tenminutes] = newTimeSeries(now, 10*time.Minute, 10*time.Second) - t.minTS[minute] = newTimeSeries(now, time.Minute, time.Second) - t.maxTS[hour] = newTimeSeries(now, time.Hour, time.Minute) - t.maxTS[tenminutes] = newTimeSeries(now, 10*time.Minute, 10*time.Second) - t.maxTS[minute] = newTimeSeries(now, time.Minute, time.Second) - t.init() - return t -} - -func (t *Tracker) init() { - t.min = math.MaxInt64 - t.max = math.MinInt64 - for _, ts := range t.minTS { - ts.set(math.MaxInt64) - } - for _, ts := range t.maxTS { - ts.set(math.MinInt64) - } -} - -func (t *Tracker) advance() time.Time { - now := TimeNow() - for _, ts := range t.minTS { - ts.advanceTimeWithFill(now, math.MaxInt64) - } - for _, ts := range t.maxTS { - ts.advanceTimeWithFill(now, math.MinInt64) - } - return now -} - -// LastUpdate returns the last update time of the range. -func (t *Tracker) LastUpdate() time.Time { - t.mu.RLock() - defer t.mu.RUnlock() - return t.lastUpdate -} - -// Push adds a new value if it is a new minimum or maximum. -func (t *Tracker) Push(value int64) { - t.mu.Lock() - defer t.mu.Unlock() - t.lastUpdate = t.advance() - if t.min > value { - t.min = value - } - if t.max < value { - t.max = value - } - for _, ts := range t.minTS { - if ts.headValue() > value { - ts.set(value) - } - } - for _, ts := range t.maxTS { - if ts.headValue() < value { - ts.set(value) - } - } -} - -// Min returns the minimum value of the tracker -func (t *Tracker) Min() int64 { - t.mu.RLock() - defer t.mu.RUnlock() - return t.min -} - -// Max returns the maximum value of the tracker. -func (t *Tracker) Max() int64 { - t.mu.RLock() - defer t.mu.RUnlock() - return t.max -} - -// Min1h returns the minimum value for the last hour. -func (t *Tracker) Min1h() int64 { - t.mu.Lock() - defer t.mu.Unlock() - t.advance() - return t.minTS[hour].min() -} - -// Max1h returns the maximum value for the last hour. -func (t *Tracker) Max1h() int64 { - t.mu.Lock() - defer t.mu.Unlock() - t.advance() - return t.maxTS[hour].max() -} - -// Min10m returns the minimum value for the last 10 minutes. -func (t *Tracker) Min10m() int64 { - t.mu.Lock() - defer t.mu.Unlock() - t.advance() - return t.minTS[tenminutes].min() -} - -// Max10m returns the maximum value for the last 10 minutes. -func (t *Tracker) Max10m() int64 { - t.mu.Lock() - defer t.mu.Unlock() - t.advance() - return t.maxTS[tenminutes].max() -} - -// Min1m returns the minimum value for the last 1 minute. -func (t *Tracker) Min1m() int64 { - t.mu.Lock() - defer t.mu.Unlock() - t.advance() - return t.minTS[minute].min() -} - -// Max1m returns the maximum value for the last 1 minute. -func (t *Tracker) Max1m() int64 { - t.mu.Lock() - defer t.mu.Unlock() - t.advance() - return t.maxTS[minute].max() -} - -// Reset resets the range to an empty state. -func (t *Tracker) Reset() { - t.mu.Lock() - defer t.mu.Unlock() - now := TimeNow() - for _, ts := range t.minTS { - ts.reset(now) - } - for _, ts := range t.maxTS { - ts.reset(now) - } - t.init() -} From 7bc11a0c29086bc678258e44ce233fb8d0a7fae8 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 4 May 2016 18:08:24 -0700 Subject: [PATCH 02/17] Remove histogramValue --- benchmark/stats/histogram.go | 148 ++++++++++----------------- benchmark/stats/stats.go | 2 +- benchmark/worker/benchmark_client.go | 15 ++- 3 files changed, 60 insertions(+), 105 deletions(-) diff --git a/benchmark/stats/histogram.go b/benchmark/stats/histogram.go index fea014c2..0732a1c6 100644 --- a/benchmark/stats/histogram.go +++ b/benchmark/stats/histogram.go @@ -9,20 +9,35 @@ import ( "strings" ) -// HistogramValue is the value of Histogram objects. -type HistogramValue struct { - // Count is the total number of values added to the histogram. - Count int64 - // Sum is the sum of all the values added to the histogram. - Sum int64 - // SumOfSquares is the sum of squares of all values. +// Histogram accumulates values in the form of a histogram with +// exponentially increased bucket sizes. +// The first bucket (with index 0) is [0, n) where n = baseBucketSize. +// Bucket i (i>=1) contains [n * m^(i-1), n * m^i), where m = 1 + GrowthFactor. +// The type of the values is int64. +type Histogram struct { + opts HistogramOptions + Buckets []HistogramBucket + Count int64 + Sum int64 SumOfSquares int64 - // Min is the minimum of all the values added to the histogram. - Min int64 - // Max is the maximum of all the values added to the histogram. - Max int64 - // Buckets contains all the buckets of the histogram. - Buckets []HistogramBucket + Min int64 + Max int64 + + logBaseBucketSize float64 + oneOverLogOnePlusGrowthFactor float64 +} + +// HistogramOptions contains the parameters that define the histogram's buckets. +type HistogramOptions struct { + // NumBuckets is the number of buckets. + NumBuckets int + // GrowthFactor is the growth factor of the buckets. A value of 0.1 + // indicates that bucket N+1 will be 10% larger than bucket N. + GrowthFactor float64 + // BaseBucketSize is the size of the first bucket. + BaseBucketSize float64 + // MinValue is the lower bound of the first bucket. + MinValue int64 } // HistogramBucket is one histogram bucket. @@ -34,7 +49,7 @@ type HistogramBucket struct { } // Print writes textual output of the histogram values. -func (v HistogramValue) Print(w io.Writer) { +func (v Histogram) Print(w io.Writer) { avg := float64(v.Sum) / float64(v.Count) fmt.Fprintf(w, "Count: %d Min: %d Max: %d Avg: %.2f\n", v.Count, v.Min, v.Max, avg) fmt.Fprintf(w, "%s\n", strings.Repeat("-", 60)) @@ -69,50 +84,12 @@ func (v HistogramValue) Print(w io.Writer) { } // String returns the textual output of the histogram values as string. -func (v HistogramValue) String() string { +func (v Histogram) String() string { var b bytes.Buffer v.Print(&b) return b.String() } -// Histogram accumulates values in the form of a histogram with -// exponentially increased bucket sizes. -// The first bucket (with index 0) is [0, n) where n = baseBucketSize. -// Bucket i (i>=1) contains [n * m^(i-1), n * m^i), where m = 1 + GrowthFactor. -// The type of the values is int64. -type Histogram struct { - opts HistogramOptions - buckets []bucketInternal - count int64 - sum int64 - sumOfSquares int64 - min int64 - max int64 - - logBaseBucketSize float64 - oneOverLogOnePlusGrowthFactor float64 -} - -// HistogramOptions contains the parameters that define the histogram's buckets. -type HistogramOptions struct { - // NumBuckets is the number of buckets. - NumBuckets int - // GrowthFactor is the growth factor of the buckets. A value of 0.1 - // indicates that bucket N+1 will be 10% larger than bucket N. - GrowthFactor float64 - // BaseBucketSize is the size of the first bucket. - BaseBucketSize float64 - // MinValue is the lower bound of the first bucket. - MinValue int64 -} - -// bucketInternal is the internal representation of a bucket, which includes a -// rate counter. -type bucketInternal struct { - lowBound float64 - count int64 -} - // NewHistogram returns a pointer to a new Histogram object that was created // with the provided options. func NewHistogram(opts HistogramOptions) *Histogram { @@ -124,18 +101,18 @@ func NewHistogram(opts HistogramOptions) *Histogram { } h := Histogram{ opts: opts, - buckets: make([]bucketInternal, opts.NumBuckets), - min: math.MaxInt64, - max: math.MinInt64, + Buckets: make([]HistogramBucket, opts.NumBuckets), + Min: math.MaxInt64, + Max: math.MinInt64, logBaseBucketSize: math.Log(opts.BaseBucketSize), oneOverLogOnePlusGrowthFactor: 1 / math.Log(1+opts.GrowthFactor), } m := 1.0 + opts.GrowthFactor delta := opts.BaseBucketSize - h.buckets[0].lowBound = float64(opts.MinValue) + h.Buckets[0].LowBound = float64(opts.MinValue) for i := 1; i < opts.NumBuckets; i++ { - h.buckets[i].lowBound = float64(opts.MinValue) + delta + h.Buckets[i].LowBound = float64(opts.MinValue) + delta delta = delta * m } return &h @@ -143,14 +120,14 @@ func NewHistogram(opts HistogramOptions) *Histogram { // Clear resets all the content of histogram. func (h *Histogram) Clear() { - h.count = 0 - h.sum = 0 - h.sumOfSquares = 0 - h.max = 0 - h.min = math.MaxInt64 - h.max = math.MinInt64 - for _, v := range h.buckets { - v.count = 0 + h.Count = 0 + h.Sum = 0 + h.SumOfSquares = 0 + h.Max = 0 + h.Min = math.MaxInt64 + h.Max = math.MinInt64 + for _, v := range h.Buckets { + v.Count = 0 } } @@ -165,40 +142,19 @@ func (h *Histogram) Add(value int64) error { if err != nil { return err } - h.buckets[bucket].count++ - h.count++ - h.sum += value - h.sumOfSquares += value * value - if value < h.min { - h.min = value + h.Buckets[bucket].Count++ + h.Count++ + h.Sum += value + h.SumOfSquares += value * value + if value < h.Min { + h.Min = value } - if value > h.max { - h.max = value + if value > h.Max { + h.Max = value } return nil } -// Value returns the accumulated state of the histogram since it was created. -func (h *Histogram) Value() HistogramValue { - b := make([]HistogramBucket, len(h.buckets)) - for i, v := range h.buckets { - b[i] = HistogramBucket{ - LowBound: v.lowBound, - Count: v.count, - } - } - - v := HistogramValue{ - Count: h.count, - Sum: h.sum, - SumOfSquares: h.sumOfSquares, - Min: h.min, - Max: h.max, - Buckets: b, - } - return v -} - func (h *Histogram) findBucket(value int64) (int, error) { delta := float64(value - h.opts.MinValue) var b int @@ -208,7 +164,7 @@ func (h *Histogram) findBucket(value int64) (int, error) { // = (log(delta) - log(baseBucketSize)) * (1 / log(1+growthFactor)) + 1 b = int((math.Log(delta)-h.logBaseBucketSize)*h.oneOverLogOnePlusGrowthFactor + 1) } - if b >= len(h.buckets) { + if b >= len(h.Buckets) { return 0, fmt.Errorf("no bucket for value: %d", value) } return b, nil diff --git a/benchmark/stats/stats.go b/benchmark/stats/stats.go index eef620d4..e0edb174 100644 --- a/benchmark/stats/stats.go +++ b/benchmark/stats/stats.go @@ -104,7 +104,7 @@ func (stats *Stats) Print(w io.Writer) { fmt.Fprint(w, "Histogram (empty)\n") } else { fmt.Fprintf(w, "Histogram (unit: %s)\n", fmt.Sprintf("%v", stats.unit)[1:]) - stats.histogram.Value().Print(w) + stats.histogram.Print(w) } } diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go index 61d17d7f..b2e0f41d 100644 --- a/benchmark/worker/benchmark_client.go +++ b/benchmark/worker/benchmark_client.go @@ -318,19 +318,18 @@ func (bc *benchmarkClient) getStats() *testpb.ClientStats { defer bc.mu.RUnlock() timeElapsed := time.Since(bc.lastResetTime).Seconds() - histogramValue := bc.histogram.Value() - b := make([]uint32, len(histogramValue.Buckets)) - for i, v := range histogramValue.Buckets { + b := make([]uint32, len(bc.histogram.Buckets)) + for i, v := range bc.histogram.Buckets { b[i] = uint32(v.Count) } return &testpb.ClientStats{ Latencies: &testpb.HistogramData{ Bucket: b, - MinSeen: float64(histogramValue.Min), - MaxSeen: float64(histogramValue.Max), - Sum: float64(histogramValue.Sum), - SumOfSquares: float64(histogramValue.SumOfSquares), - Count: float64(histogramValue.Count), + MinSeen: float64(bc.histogram.Min), + MaxSeen: float64(bc.histogram.Max), + Sum: float64(bc.histogram.Sum), + SumOfSquares: float64(bc.histogram.SumOfSquares), + Count: float64(bc.histogram.Count), }, TimeElapsed: timeElapsed, TimeUser: 0, From 43b6e14060b68d90ccef699267d88cb9d574afd2 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 5 May 2016 10:47:14 -0700 Subject: [PATCH 03/17] Add comments for exported fields --- benchmark/stats/histogram.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/benchmark/stats/histogram.go b/benchmark/stats/histogram.go index 0732a1c6..de2c8445 100644 --- a/benchmark/stats/histogram.go +++ b/benchmark/stats/histogram.go @@ -15,14 +15,20 @@ import ( // Bucket i (i>=1) contains [n * m^(i-1), n * m^i), where m = 1 + GrowthFactor. // The type of the values is int64. type Histogram struct { - opts HistogramOptions - Buckets []HistogramBucket - Count int64 - Sum int64 + // Count is the total number of values added to the histogram. + Count int64 + // Sum is the sum of all the values added to the histogram. + Sum int64 + // SumOfSquares is the sum of squares of all values. SumOfSquares int64 - Min int64 - Max int64 + // Min is the minimum of all the values added to the histogram. + Min int64 + // Max is the maximum of all the values added to the histogram. + Max int64 + // Buckets contains all the buckets of the histogram. + Buckets []HistogramBucket + opts HistogramOptions logBaseBucketSize float64 oneOverLogOnePlusGrowthFactor float64 } @@ -100,11 +106,11 @@ func NewHistogram(opts HistogramOptions) *Histogram { opts.BaseBucketSize = 1.0 } h := Histogram{ - opts: opts, Buckets: make([]HistogramBucket, opts.NumBuckets), Min: math.MaxInt64, Max: math.MinInt64, + opts: opts, logBaseBucketSize: math.Log(opts.BaseBucketSize), oneOverLogOnePlusGrowthFactor: 1 / math.Log(1+opts.GrowthFactor), } From 660a65ba7cc40a487ea75e0ef44922b145967165 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 6 May 2016 14:06:41 -0700 Subject: [PATCH 04/17] Minor fixes Move code around --- benchmark/stats/histogram.go | 57 ++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/benchmark/stats/histogram.go b/benchmark/stats/histogram.go index de2c8445..c6ff6080 100644 --- a/benchmark/stats/histogram.go +++ b/benchmark/stats/histogram.go @@ -54,6 +54,34 @@ type HistogramBucket struct { Count int64 } +// NewHistogram returns a pointer to a new Histogram object that was created +// with the provided options. +func NewHistogram(opts HistogramOptions) *Histogram { + if opts.NumBuckets == 0 { + opts.NumBuckets = 32 + } + if opts.BaseBucketSize == 0.0 { + opts.BaseBucketSize = 1.0 + } + h := Histogram{ + Buckets: make([]HistogramBucket, opts.NumBuckets), + Min: math.MaxInt64, + Max: math.MinInt64, + + opts: opts, + logBaseBucketSize: math.Log(opts.BaseBucketSize), + oneOverLogOnePlusGrowthFactor: 1 / math.Log(1+opts.GrowthFactor), + } + m := 1.0 + opts.GrowthFactor + delta := opts.BaseBucketSize + h.Buckets[0].LowBound = float64(opts.MinValue) + for i := 1; i < opts.NumBuckets; i++ { + h.Buckets[i].LowBound = float64(opts.MinValue) + delta + delta = delta * m + } + return &h +} + // Print writes textual output of the histogram values. func (v Histogram) Print(w io.Writer) { avg := float64(v.Sum) / float64(v.Count) @@ -96,40 +124,11 @@ func (v Histogram) String() string { return b.String() } -// NewHistogram returns a pointer to a new Histogram object that was created -// with the provided options. -func NewHistogram(opts HistogramOptions) *Histogram { - if opts.NumBuckets == 0 { - opts.NumBuckets = 32 - } - if opts.BaseBucketSize == 0.0 { - opts.BaseBucketSize = 1.0 - } - h := Histogram{ - Buckets: make([]HistogramBucket, opts.NumBuckets), - Min: math.MaxInt64, - Max: math.MinInt64, - - opts: opts, - logBaseBucketSize: math.Log(opts.BaseBucketSize), - oneOverLogOnePlusGrowthFactor: 1 / math.Log(1+opts.GrowthFactor), - } - m := 1.0 + opts.GrowthFactor - delta := opts.BaseBucketSize - h.Buckets[0].LowBound = float64(opts.MinValue) - for i := 1; i < opts.NumBuckets; i++ { - h.Buckets[i].LowBound = float64(opts.MinValue) + delta - delta = delta * m - } - return &h -} - // Clear resets all the content of histogram. func (h *Histogram) Clear() { h.Count = 0 h.Sum = 0 h.SumOfSquares = 0 - h.Max = 0 h.Min = math.MaxInt64 h.Max = math.MinInt64 for _, v := range h.Buckets { From 4e30886378a81d3ab8effa152a47906a7a0534c5 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 6 May 2016 13:52:20 -0700 Subject: [PATCH 05/17] Create a histogram for each goroutine --- benchmark/worker/benchmark_client.go | 78 ++++++++++++++++------------ 1 file changed, 45 insertions(+), 33 deletions(-) diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go index b2e0f41d..9c4c7a64 100644 --- a/benchmark/worker/benchmark_client.go +++ b/benchmark/worker/benchmark_client.go @@ -54,11 +54,12 @@ var ( ) type benchmarkClient struct { - closeConns func() - stop chan bool - mu sync.RWMutex - lastResetTime time.Time - histogram *stats.Histogram + closeConns func() + stop chan bool + lastResetTime time.Time + histogramOptions stats.HistogramOptions + mutexes []*sync.RWMutex + histograms []*stats.Histogram } func printClientConfig(config *testpb.ClientConfig) { @@ -199,13 +200,17 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) return nil, err } + rpcCountPerConn := int(config.OutstandingRpcsPerChannel) bc := &benchmarkClient{ - histogram: stats.NewHistogram(stats.HistogramOptions{ + histogramOptions: stats.HistogramOptions{ NumBuckets: int(math.Log(config.HistogramParams.MaxPossible)/math.Log(1+config.HistogramParams.Resolution)) + 1, GrowthFactor: config.HistogramParams.Resolution, BaseBucketSize: (1 + config.HistogramParams.Resolution), MinValue: 0, - }), + }, + mutexes: make([]*sync.RWMutex, rpcCountPerConn*len(conns)), + histograms: make([]*stats.Histogram, rpcCountPerConn*len(conns)), + stop: make(chan bool), lastResetTime: time.Now(), closeConns: closeConns, @@ -221,11 +226,16 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) } func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int) { - for _, conn := range conns { + for ic, conn := range conns { client := testpb.NewBenchmarkServiceClient(conn) // For each connection, create rpcCountPerConn goroutines to do rpc. for j := 0; j < rpcCountPerConn; j++ { - go func() { + // Create mutex and histogram for each goroutine. + idx := ic*rpcCountPerConn + j + bc.mutexes[idx] = new(sync.RWMutex) + bc.histograms[idx] = stats.NewHistogram(bc.histogramOptions) + // Start goroutine on the created mutex and histogram. + go func(mu *sync.RWMutex, histogram *stats.Histogram) { // TODO: do warm up if necessary. // Now relying on worker client to reserve time to do warm up. // The worker client needs to wait for some time after client is created, @@ -242,9 +252,9 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe return } elapse := time.Since(start) - bc.mu.Lock() - bc.histogram.Add(int64(elapse)) - bc.mu.Unlock() + mu.Lock() + histogram.Add(int64(elapse)) + mu.Unlock() select { case <-bc.stop: case done <- true: @@ -256,7 +266,7 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe case <-done: } } - }() + }(bc.mutexes[idx], bc.histograms[idx]) } } } @@ -268,7 +278,7 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou } else { doRPC = benchmark.DoStreamingRoundTrip } - for _, conn := range conns { + for ic, conn := range conns { // For each connection, create rpcCountPerConn goroutines to do rpc. for j := 0; j < rpcCountPerConn; j++ { c := testpb.NewBenchmarkServiceClient(conn) @@ -276,8 +286,12 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou if err != nil { grpclog.Fatalf("%v.StreamingCall(_) = _, %v", c, err) } - // Create benchmark rpc goroutine. - go func() { + // Create mutex and histogram for each goroutine. + idx := ic*rpcCountPerConn + j + bc.mutexes[idx] = new(sync.RWMutex) + bc.histograms[idx] = stats.NewHistogram(bc.histogramOptions) + // Start goroutine on the created mutex and histogram. + go func(mu *sync.RWMutex, histogram *stats.Histogram) { // TODO: do warm up if necessary. // Now relying on worker client to reserve time to do warm up. // The worker client needs to wait for some time after client is created, @@ -294,9 +308,9 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou return } elapse := time.Since(start) - bc.mu.Lock() - bc.histogram.Add(int64(elapse)) - bc.mu.Unlock() + mu.Lock() + histogram.Add(int64(elapse)) + mu.Unlock() select { case <-bc.stop: case done <- true: @@ -308,28 +322,28 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou case <-done: } } - }() + }(bc.mutexes[idx], bc.histograms[idx]) } } } func (bc *benchmarkClient) getStats() *testpb.ClientStats { - bc.mu.RLock() - defer bc.mu.RUnlock() timeElapsed := time.Since(bc.lastResetTime).Seconds() - b := make([]uint32, len(bc.histogram.Buckets)) - for i, v := range bc.histogram.Buckets { - b[i] = uint32(v.Count) + // TODO merge histograms. + b := make([]uint32, len(bc.histograms[0].Buckets)) + var totalcount int64 + for _, h := range bc.histograms { + totalcount += h.Count } return &testpb.ClientStats{ Latencies: &testpb.HistogramData{ Bucket: b, - MinSeen: float64(bc.histogram.Min), - MaxSeen: float64(bc.histogram.Max), - Sum: float64(bc.histogram.Sum), - SumOfSquares: float64(bc.histogram.SumOfSquares), - Count: float64(bc.histogram.Count), + MinSeen: float64(bc.histograms[0].Min), + MaxSeen: float64(bc.histograms[0].Max), + Sum: float64(bc.histograms[0].Sum), + SumOfSquares: float64(bc.histograms[0].SumOfSquares), + Count: float64(totalcount), }, TimeElapsed: timeElapsed, TimeUser: 0, @@ -340,10 +354,8 @@ func (bc *benchmarkClient) getStats() *testpb.ClientStats { // reset clears the contents for histogram and set lastResetTime to Now(). // It is called to get ready for benchmark runs. func (bc *benchmarkClient) reset() { - bc.mu.Lock() - defer bc.mu.Unlock() bc.lastResetTime = time.Now() - bc.histogram.Clear() + bc.histograms[0].Clear() } func (bc *benchmarkClient) shutdown() { From 9ccc0dd9880349b1281acb8e62e5b571c28024ed Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 6 May 2016 14:15:49 -0700 Subject: [PATCH 06/17] Add histogram Merge function --- benchmark/stats/histogram.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/benchmark/stats/histogram.go b/benchmark/stats/histogram.go index c6ff6080..a77b51da 100644 --- a/benchmark/stats/histogram.go +++ b/benchmark/stats/histogram.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "io" + "log" "math" "strconv" "strings" @@ -174,3 +175,21 @@ func (h *Histogram) findBucket(value int64) (int, error) { } return b, nil } + +func (h *Histogram) Merge(h2 *Histogram) { + if len(h.Buckets) != len(h2.Buckets) { + log.Fatalf("failed to merge histograms, inequivalent buckets length") + } + h.Count += h2.Count + h.Sum += h2.Sum + h.SumOfSquares += h2.SumOfSquares + if h2.Min < h.Min { + h.Min = h2.Min + } + if h2.Max > h.Max { + h.Max = h2.Max + } + for i, b := range h2.Buckets { + h.Buckets[i].Count += b.Count + } +} From e1243883d65512f5699eb8c6a99f0059fda10058 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 6 May 2016 14:49:12 -0700 Subject: [PATCH 07/17] Merge histograms into one histogram in getStats --- benchmark/worker/benchmark_client.go | 52 +++++++++++++++++----------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go index 9c4c7a64..03468950 100644 --- a/benchmark/worker/benchmark_client.go +++ b/benchmark/worker/benchmark_client.go @@ -235,7 +235,7 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe bc.mutexes[idx] = new(sync.RWMutex) bc.histograms[idx] = stats.NewHistogram(bc.histogramOptions) // Start goroutine on the created mutex and histogram. - go func(mu *sync.RWMutex, histogram *stats.Histogram) { + go func(idx int) { // TODO: do warm up if necessary. // Now relying on worker client to reserve time to do warm up. // The worker client needs to wait for some time after client is created, @@ -252,9 +252,9 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe return } elapse := time.Since(start) - mu.Lock() - histogram.Add(int64(elapse)) - mu.Unlock() + bc.mutexes[idx].Lock() + bc.histograms[idx].Add(int64(elapse)) + bc.mutexes[idx].Unlock() select { case <-bc.stop: case done <- true: @@ -266,7 +266,7 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe case <-done: } } - }(bc.mutexes[idx], bc.histograms[idx]) + }(idx) } } } @@ -291,7 +291,7 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou bc.mutexes[idx] = new(sync.RWMutex) bc.histograms[idx] = stats.NewHistogram(bc.histogramOptions) // Start goroutine on the created mutex and histogram. - go func(mu *sync.RWMutex, histogram *stats.Histogram) { + go func(idx int) { // TODO: do warm up if necessary. // Now relying on worker client to reserve time to do warm up. // The worker client needs to wait for some time after client is created, @@ -308,9 +308,9 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou return } elapse := time.Since(start) - mu.Lock() - histogram.Add(int64(elapse)) - mu.Unlock() + bc.mutexes[idx].Lock() + bc.histograms[idx].Add(int64(elapse)) + bc.mutexes[idx].Unlock() select { case <-bc.stop: case done <- true: @@ -322,7 +322,7 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou case <-done: } } - }(bc.mutexes[idx], bc.histograms[idx]) + }(idx) } } } @@ -330,20 +330,32 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou func (bc *benchmarkClient) getStats() *testpb.ClientStats { timeElapsed := time.Since(bc.lastResetTime).Seconds() - // TODO merge histograms. - b := make([]uint32, len(bc.histograms[0].Buckets)) - var totalcount int64 - for _, h := range bc.histograms { - totalcount += h.Count + // Merging histogram may take some time. + // Put all histograms aside and merge later. + toMerge := make([]*stats.Histogram, len(bc.histograms)) + for i := range bc.histograms { + bc.mutexes[i].Lock() + toMerge[i] = bc.histograms[i] + bc.histograms[i] = stats.NewHistogram(bc.histogramOptions) + bc.mutexes[i].Unlock() + } + + for i := 1; i < len(toMerge); i++ { + toMerge[0].Merge(toMerge[i]) + } + + b := make([]uint32, len(toMerge[0].Buckets)) + for i, v := range toMerge[0].Buckets { + b[i] = uint32(v.Count) } return &testpb.ClientStats{ Latencies: &testpb.HistogramData{ Bucket: b, - MinSeen: float64(bc.histograms[0].Min), - MaxSeen: float64(bc.histograms[0].Max), - Sum: float64(bc.histograms[0].Sum), - SumOfSquares: float64(bc.histograms[0].SumOfSquares), - Count: float64(totalcount), + MinSeen: float64(toMerge[0].Min), + MaxSeen: float64(toMerge[0].Max), + Sum: float64(toMerge[0].Sum), + SumOfSquares: float64(toMerge[0].SumOfSquares), + Count: float64(toMerge[0].Count), }, TimeElapsed: timeElapsed, TimeUser: 0, From 9521f8ddae031564caedcd380a2faf4ce360b272 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 11 May 2016 12:02:39 -0700 Subject: [PATCH 08/17] Change RWMutex to Mutex --- benchmark/worker/benchmark_client.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go index 03468950..74304f33 100644 --- a/benchmark/worker/benchmark_client.go +++ b/benchmark/worker/benchmark_client.go @@ -58,7 +58,7 @@ type benchmarkClient struct { stop chan bool lastResetTime time.Time histogramOptions stats.HistogramOptions - mutexes []*sync.RWMutex + mutexes []*sync.Mutex histograms []*stats.Histogram } @@ -208,7 +208,7 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) BaseBucketSize: (1 + config.HistogramParams.Resolution), MinValue: 0, }, - mutexes: make([]*sync.RWMutex, rpcCountPerConn*len(conns)), + mutexes: make([]*sync.Mutex, rpcCountPerConn*len(conns)), histograms: make([]*stats.Histogram, rpcCountPerConn*len(conns)), stop: make(chan bool), @@ -232,7 +232,7 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe for j := 0; j < rpcCountPerConn; j++ { // Create mutex and histogram for each goroutine. idx := ic*rpcCountPerConn + j - bc.mutexes[idx] = new(sync.RWMutex) + bc.mutexes[idx] = new(sync.Mutex) bc.histograms[idx] = stats.NewHistogram(bc.histogramOptions) // Start goroutine on the created mutex and histogram. go func(idx int) { @@ -288,7 +288,7 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou } // Create mutex and histogram for each goroutine. idx := ic*rpcCountPerConn + j - bc.mutexes[idx] = new(sync.RWMutex) + bc.mutexes[idx] = new(sync.Mutex) bc.histograms[idx] = stats.NewHistogram(bc.histogramOptions) // Start goroutine on the created mutex and histogram. go func(idx int) { From 423a9dea781499ea71eb752fc5c21336b5b289c5 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 11 May 2016 17:20:16 -0700 Subject: [PATCH 09/17] Change getStats to do reseting if required. --- benchmark/worker/benchmark_client.go | 61 ++++++++++++++++------------ benchmark/worker/benchmark_server.go | 16 ++++---- benchmark/worker/main.go | 14 ++----- 3 files changed, 46 insertions(+), 45 deletions(-) diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go index 74304f33..37b9ae24 100644 --- a/benchmark/worker/benchmark_client.go +++ b/benchmark/worker/benchmark_client.go @@ -327,35 +327,49 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou } } -func (bc *benchmarkClient) getStats() *testpb.ClientStats { +// getStats return the stats for benchmark client. +// It resets lastResetTime and all histograms if argument reset is true. +func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats { timeElapsed := time.Since(bc.lastResetTime).Seconds() + mergedHistogram := stats.NewHistogram(bc.histogramOptions) - // Merging histogram may take some time. - // Put all histograms aside and merge later. - toMerge := make([]*stats.Histogram, len(bc.histograms)) - for i := range bc.histograms { - bc.mutexes[i].Lock() - toMerge[i] = bc.histograms[i] - bc.histograms[i] = stats.NewHistogram(bc.histogramOptions) - bc.mutexes[i].Unlock() + if reset { + bc.lastResetTime = time.Now() + + // Merging histogram may take some time. + // Put all histograms aside and merge later. + toMerge := make([]*stats.Histogram, len(bc.histograms)) + for i := range bc.histograms { + bc.mutexes[i].Lock() + toMerge[i] = bc.histograms[i] + bc.histograms[i] = stats.NewHistogram(bc.histogramOptions) + bc.mutexes[i].Unlock() + } + + for i := 0; i < len(toMerge); i++ { + mergedHistogram.Merge(toMerge[i]) + } + } else { + // Only merging histograms, not resetting. + for i := range bc.histograms { + bc.mutexes[i].Lock() + mergedHistogram.Merge(bc.histograms[i]) + bc.mutexes[i].Unlock() + } } - for i := 1; i < len(toMerge); i++ { - toMerge[0].Merge(toMerge[i]) - } - - b := make([]uint32, len(toMerge[0].Buckets)) - for i, v := range toMerge[0].Buckets { + b := make([]uint32, len(mergedHistogram.Buckets)) + for i, v := range mergedHistogram.Buckets { b[i] = uint32(v.Count) } return &testpb.ClientStats{ Latencies: &testpb.HistogramData{ Bucket: b, - MinSeen: float64(toMerge[0].Min), - MaxSeen: float64(toMerge[0].Max), - Sum: float64(toMerge[0].Sum), - SumOfSquares: float64(toMerge[0].SumOfSquares), - Count: float64(toMerge[0].Count), + MinSeen: float64(mergedHistogram.Min), + MaxSeen: float64(mergedHistogram.Max), + Sum: float64(mergedHistogram.Sum), + SumOfSquares: float64(mergedHistogram.SumOfSquares), + Count: float64(mergedHistogram.Count), }, TimeElapsed: timeElapsed, TimeUser: 0, @@ -363,13 +377,6 @@ func (bc *benchmarkClient) getStats() *testpb.ClientStats { } } -// reset clears the contents for histogram and set lastResetTime to Now(). -// It is called to get ready for benchmark runs. -func (bc *benchmarkClient) reset() { - bc.lastResetTime = time.Now() - bc.histograms[0].Clear() -} - func (bc *benchmarkClient) shutdown() { close(bc.stop) bc.closeConns() diff --git a/benchmark/worker/benchmark_server.go b/benchmark/worker/benchmark_server.go index 7ccb069c..f0e38e03 100644 --- a/benchmark/worker/benchmark_server.go +++ b/benchmark/worker/benchmark_server.go @@ -158,15 +158,15 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma return &benchmarkServer{port: p, cores: numOfCores, closeFunc: closeFunc, lastResetTime: time.Now()}, nil } -func (bs *benchmarkServer) getStats() *testpb.ServerStats { +// getStats return the stats for benchmark server. +// It resets lastResetTime if argument reset is true. +func (bs *benchmarkServer) getStats(reset bool) *testpb.ServerStats { // TODO wall time, sys time, user time. bs.mu.RLock() defer bs.mu.RUnlock() - return &testpb.ServerStats{TimeElapsed: time.Since(bs.lastResetTime).Seconds(), TimeUser: 0, TimeSystem: 0} -} - -func (bs *benchmarkServer) reset() { - bs.mu.Lock() - defer bs.mu.Unlock() - bs.lastResetTime = time.Now() + timeElapsed := time.Since(bs.lastResetTime).Seconds() + if reset { + bs.lastResetTime = time.Now() + } + return &testpb.ServerStats{TimeElapsed: timeElapsed, TimeUser: 0, TimeSystem: 0} } diff --git a/benchmark/worker/main.go b/benchmark/worker/main.go index 236ca8df..8fc2160c 100644 --- a/benchmark/worker/main.go +++ b/benchmark/worker/main.go @@ -116,7 +116,7 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er return err } out = &testpb.ServerStatus{ - Stats: bs.getStats(), + Stats: bs.getStats(false), Port: int32(bs.port), Cores: int32(bs.cores), } @@ -128,13 +128,10 @@ func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) er return grpc.Errorf(codes.InvalidArgument, "server does not exist when mark received") } out = &testpb.ServerStatus{ - Stats: bs.getStats(), + Stats: bs.getStats(argtype.Mark.Reset_), Port: int32(bs.port), Cores: int32(bs.cores), } - if argtype.Mark.Reset_ { - bs.reset() - } } if err := stream.Send(out); err != nil { @@ -176,7 +173,7 @@ func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) er return err } out = &testpb.ClientStatus{ - Stats: bc.getStats(), + Stats: bc.getStats(false), } case *testpb.ClientArgs_Mark: @@ -186,10 +183,7 @@ func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) er return grpc.Errorf(codes.InvalidArgument, "client does not exist when mark received") } out = &testpb.ClientStatus{ - Stats: bc.getStats(), - } - if t.Mark.Reset_ { - bc.reset() + Stats: bc.getStats(t.Mark.Reset_), } } From 0dc39fe27f3955701870c242c82ec6fa77886948 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 11 May 2016 17:32:28 -0700 Subject: [PATCH 10/17] Golint and histogram opts compare --- benchmark/stats/histogram.go | 30 +++++++++++++++------------- benchmark/worker/benchmark_client.go | 4 ++-- benchmark/worker/benchmark_server.go | 2 +- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/benchmark/stats/histogram.go b/benchmark/stats/histogram.go index a77b51da..140b6ea4 100644 --- a/benchmark/stats/histogram.go +++ b/benchmark/stats/histogram.go @@ -84,27 +84,27 @@ func NewHistogram(opts HistogramOptions) *Histogram { } // Print writes textual output of the histogram values. -func (v Histogram) Print(w io.Writer) { - avg := float64(v.Sum) / float64(v.Count) - fmt.Fprintf(w, "Count: %d Min: %d Max: %d Avg: %.2f\n", v.Count, v.Min, v.Max, avg) +func (h Histogram) Print(w io.Writer) { + avg := float64(h.Sum) / float64(h.Count) + fmt.Fprintf(w, "Count: %d Min: %d Max: %d Avg: %.2f\n", h.Count, h.Min, h.Max, avg) fmt.Fprintf(w, "%s\n", strings.Repeat("-", 60)) - if v.Count <= 0 { + if h.Count <= 0 { return } - maxBucketDigitLen := len(strconv.FormatFloat(v.Buckets[len(v.Buckets)-1].LowBound, 'f', 6, 64)) + maxBucketDigitLen := len(strconv.FormatFloat(h.Buckets[len(h.Buckets)-1].LowBound, 'f', 6, 64)) if maxBucketDigitLen < 3 { // For "inf". maxBucketDigitLen = 3 } - maxCountDigitLen := len(strconv.FormatInt(v.Count, 10)) - percentMulti := 100 / float64(v.Count) + maxCountDigitLen := len(strconv.FormatInt(h.Count, 10)) + percentMulti := 100 / float64(h.Count) accCount := int64(0) - for i, b := range v.Buckets { + for i, b := range h.Buckets { fmt.Fprintf(w, "[%*f, ", maxBucketDigitLen, b.LowBound) - if i+1 < len(v.Buckets) { - fmt.Fprintf(w, "%*f)", maxBucketDigitLen, v.Buckets[i+1].LowBound) + if i+1 < len(h.Buckets) { + fmt.Fprintf(w, "%*f)", maxBucketDigitLen, h.Buckets[i+1].LowBound) } else { fmt.Fprintf(w, "%*s)", maxBucketDigitLen, "inf") } @@ -119,9 +119,9 @@ func (v Histogram) Print(w io.Writer) { } // String returns the textual output of the histogram values as string. -func (v Histogram) String() string { +func (h Histogram) String() string { var b bytes.Buffer - v.Print(&b) + h.Print(&b) return b.String() } @@ -176,9 +176,11 @@ func (h *Histogram) findBucket(value int64) (int, error) { return b, nil } +// Merge takes another histogram, and merges its content into the receiver. +// The two histograms must be created by equivalent HistogramOptions. func (h *Histogram) Merge(h2 *Histogram) { - if len(h.Buckets) != len(h2.Buckets) { - log.Fatalf("failed to merge histograms, inequivalent buckets length") + if h.opts != h2.opts { + log.Fatalf("failed to merge histograms, created by inequivalent options") } h.Count += h2.Count h.Sum += h2.Sum diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go index 37b9ae24..26b31a3b 100644 --- a/benchmark/worker/benchmark_client.go +++ b/benchmark/worker/benchmark_client.go @@ -327,7 +327,7 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou } } -// getStats return the stats for benchmark client. +// getStats returns the stats for benchmark client. // It resets lastResetTime and all histograms if argument reset is true. func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats { timeElapsed := time.Since(bc.lastResetTime).Seconds() @@ -350,7 +350,7 @@ func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats { mergedHistogram.Merge(toMerge[i]) } } else { - // Only merging histograms, not resetting. + // Merge only, not reset. for i := range bc.histograms { bc.mutexes[i].Lock() mergedHistogram.Merge(bc.histograms[i]) diff --git a/benchmark/worker/benchmark_server.go b/benchmark/worker/benchmark_server.go index f0e38e03..dd53e47a 100644 --- a/benchmark/worker/benchmark_server.go +++ b/benchmark/worker/benchmark_server.go @@ -158,7 +158,7 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma return &benchmarkServer{port: p, cores: numOfCores, closeFunc: closeFunc, lastResetTime: time.Now()}, nil } -// getStats return the stats for benchmark server. +// getStats returns the stats for benchmark server. // It resets lastResetTime if argument reset is true. func (bs *benchmarkServer) getStats(reset bool) *testpb.ServerStats { // TODO wall time, sys time, user time. From 7bb7ec5e30b67d3ae5fdf0f5d5f919beb7e4ca1b Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 12 May 2016 10:44:43 -0700 Subject: [PATCH 11/17] Reset lastResetTime after merging histograms --- benchmark/worker/benchmark_client.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go index 26b31a3b..79542818 100644 --- a/benchmark/worker/benchmark_client.go +++ b/benchmark/worker/benchmark_client.go @@ -330,12 +330,10 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou // getStats returns the stats for benchmark client. // It resets lastResetTime and all histograms if argument reset is true. func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats { - timeElapsed := time.Since(bc.lastResetTime).Seconds() + var timeElapsed float64 mergedHistogram := stats.NewHistogram(bc.histogramOptions) if reset { - bc.lastResetTime = time.Now() - // Merging histogram may take some time. // Put all histograms aside and merge later. toMerge := make([]*stats.Histogram, len(bc.histograms)) @@ -349,6 +347,9 @@ func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats { for i := 0; i < len(toMerge); i++ { mergedHistogram.Merge(toMerge[i]) } + + timeElapsed = time.Since(bc.lastResetTime).Seconds() + bc.lastResetTime = time.Now() } else { // Merge only, not reset. for i := range bc.histograms { @@ -356,6 +357,7 @@ func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats { mergedHistogram.Merge(bc.histograms[i]) bc.mutexes[i].Unlock() } + timeElapsed = time.Since(bc.lastResetTime).Seconds() } b := make([]uint32, len(mergedHistogram.Buckets)) From 577b4b0d4c48abb73df9570ca5eb6151b70e334c Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 13 May 2016 10:36:50 -0700 Subject: [PATCH 12/17] Address review comments --- benchmark/stats/histogram.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/benchmark/stats/histogram.go b/benchmark/stats/histogram.go index 140b6ea4..c0e9a193 100644 --- a/benchmark/stats/histogram.go +++ b/benchmark/stats/histogram.go @@ -12,9 +12,6 @@ import ( // Histogram accumulates values in the form of a histogram with // exponentially increased bucket sizes. -// The first bucket (with index 0) is [0, n) where n = baseBucketSize. -// Bucket i (i>=1) contains [n * m^(i-1), n * m^i), where m = 1 + GrowthFactor. -// The type of the values is int64. type Histogram struct { // Count is the total number of values added to the histogram. Count int64 @@ -35,6 +32,10 @@ type Histogram struct { } // HistogramOptions contains the parameters that define the histogram's buckets. +// The first bucket of the created histogram (with index 0) contains [min, min+n) +// where n = BaseBucketSize, min = MinValue. +// Bucket i (i>=1) contains [min + n * m^(i-1), min + n * m^i), where m = 1+GrowthFactor. +// The type of the values is int64. type HistogramOptions struct { // NumBuckets is the number of buckets. NumBuckets int @@ -47,7 +48,7 @@ type HistogramOptions struct { MinValue int64 } -// HistogramBucket is one histogram bucket. +// HistogramBucket represents one histogram bucket. type HistogramBucket struct { // LowBound is the lower bound of the bucket. LowBound float64 From 8ca2706d37f81dd936f348cf7c42b3cd9388c97c Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 16 May 2016 18:28:16 -0700 Subject: [PATCH 13/17] Address review comments --- benchmark/stats/histogram.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/benchmark/stats/histogram.go b/benchmark/stats/histogram.go index c0e9a193..099bcd65 100644 --- a/benchmark/stats/histogram.go +++ b/benchmark/stats/histogram.go @@ -85,7 +85,7 @@ func NewHistogram(opts HistogramOptions) *Histogram { } // Print writes textual output of the histogram values. -func (h Histogram) Print(w io.Writer) { +func (h *Histogram) Print(w io.Writer) { avg := float64(h.Sum) / float64(h.Count) fmt.Fprintf(w, "Count: %d Min: %d Max: %d Avg: %.2f\n", h.Count, h.Min, h.Max, avg) fmt.Fprintf(w, "%s\n", strings.Repeat("-", 60)) @@ -120,7 +120,7 @@ func (h Histogram) Print(w io.Writer) { } // String returns the textual output of the histogram values as string. -func (h Histogram) String() string { +func (h *Histogram) String() string { var b bytes.Buffer h.Print(&b) return b.String() @@ -177,7 +177,7 @@ func (h *Histogram) findBucket(value int64) (int, error) { return b, nil } -// Merge takes another histogram, and merges its content into the receiver. +// Merge takes another histogram h2, and merges its content into h. // The two histograms must be created by equivalent HistogramOptions. func (h *Histogram) Merge(h2 *Histogram) { if h.opts != h2.opts { From 6c972937125fcd1112b5a5fd89e61fff5610fe27 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 16 May 2016 18:29:06 -0700 Subject: [PATCH 14/17] Use a struct of mutex and histogram in client --- benchmark/worker/benchmark_client.go | 57 ++++++++++++++-------------- 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go index 79542818..d96b228d 100644 --- a/benchmark/worker/benchmark_client.go +++ b/benchmark/worker/benchmark_client.go @@ -53,13 +53,17 @@ var ( caFile = "benchmark/server/testdata/ca.pem" ) +type lockingHistogram struct { + mu sync.Mutex + histogram *stats.Histogram +} + type benchmarkClient struct { - closeConns func() - stop chan bool - lastResetTime time.Time - histogramOptions stats.HistogramOptions - mutexes []*sync.Mutex - histograms []*stats.Histogram + closeConns func() + stop chan bool + lastResetTime time.Time + histogramOptions stats.HistogramOptions + lockingHistograms []lockingHistogram } func printClientConfig(config *testpb.ClientConfig) { @@ -208,8 +212,7 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) BaseBucketSize: (1 + config.HistogramParams.Resolution), MinValue: 0, }, - mutexes: make([]*sync.Mutex, rpcCountPerConn*len(conns)), - histograms: make([]*stats.Histogram, rpcCountPerConn*len(conns)), + lockingHistograms: make([]lockingHistogram, rpcCountPerConn*len(conns), rpcCountPerConn*len(conns)), stop: make(chan bool), lastResetTime: time.Now(), @@ -232,8 +235,7 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe for j := 0; j < rpcCountPerConn; j++ { // Create mutex and histogram for each goroutine. idx := ic*rpcCountPerConn + j - bc.mutexes[idx] = new(sync.Mutex) - bc.histograms[idx] = stats.NewHistogram(bc.histogramOptions) + bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions) // Start goroutine on the created mutex and histogram. go func(idx int) { // TODO: do warm up if necessary. @@ -252,9 +254,9 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe return } elapse := time.Since(start) - bc.mutexes[idx].Lock() - bc.histograms[idx].Add(int64(elapse)) - bc.mutexes[idx].Unlock() + bc.lockingHistograms[idx].mu.Lock() + bc.lockingHistograms[idx].histogram.Add(int64(elapse)) + bc.lockingHistograms[idx].mu.Unlock() select { case <-bc.stop: case done <- true: @@ -288,8 +290,7 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou } // Create mutex and histogram for each goroutine. idx := ic*rpcCountPerConn + j - bc.mutexes[idx] = new(sync.Mutex) - bc.histograms[idx] = stats.NewHistogram(bc.histogramOptions) + bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions) // Start goroutine on the created mutex and histogram. go func(idx int) { // TODO: do warm up if necessary. @@ -308,9 +309,9 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou return } elapse := time.Since(start) - bc.mutexes[idx].Lock() - bc.histograms[idx].Add(int64(elapse)) - bc.mutexes[idx].Unlock() + bc.lockingHistograms[idx].mu.Lock() + bc.lockingHistograms[idx].histogram.Add(int64(elapse)) + bc.lockingHistograms[idx].mu.Unlock() select { case <-bc.stop: case done <- true: @@ -336,12 +337,12 @@ func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats { if reset { // Merging histogram may take some time. // Put all histograms aside and merge later. - toMerge := make([]*stats.Histogram, len(bc.histograms)) - for i := range bc.histograms { - bc.mutexes[i].Lock() - toMerge[i] = bc.histograms[i] - bc.histograms[i] = stats.NewHistogram(bc.histogramOptions) - bc.mutexes[i].Unlock() + toMerge := make([]*stats.Histogram, len(bc.lockingHistograms)) + for i := range bc.lockingHistograms { + bc.lockingHistograms[i].mu.Lock() + toMerge[i] = bc.lockingHistograms[i].histogram + bc.lockingHistograms[i].histogram = stats.NewHistogram(bc.histogramOptions) + bc.lockingHistograms[i].mu.Unlock() } for i := 0; i < len(toMerge); i++ { @@ -352,10 +353,10 @@ func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats { bc.lastResetTime = time.Now() } else { // Merge only, not reset. - for i := range bc.histograms { - bc.mutexes[i].Lock() - mergedHistogram.Merge(bc.histograms[i]) - bc.mutexes[i].Unlock() + for i := range bc.lockingHistograms { + bc.lockingHistograms[i].mu.Lock() + mergedHistogram.Merge(bc.lockingHistograms[i].histogram) + bc.lockingHistograms[i].mu.Unlock() } timeElapsed = time.Since(bc.lastResetTime).Seconds() } From 551c49844f32f1d33f39bcf4293263affdb4f12b Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 16 May 2016 18:30:58 -0700 Subject: [PATCH 15/17] Specify cap of slices --- benchmark/worker/benchmark_client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go index d96b228d..b6d3c135 100644 --- a/benchmark/worker/benchmark_client.go +++ b/benchmark/worker/benchmark_client.go @@ -135,7 +135,7 @@ func createConns(config *testpb.ClientConfig) ([]*grpc.ClientConn, func(), error // Create connections. connCount := int(config.ClientChannels) - conns := make([]*grpc.ClientConn, connCount) + conns := make([]*grpc.ClientConn, connCount, connCount) for connIndex := 0; connIndex < connCount; connIndex++ { conns[connIndex] = benchmark.NewClientConn(config.ServerTargets[connIndex%len(config.ServerTargets)], opts...) } @@ -337,7 +337,7 @@ func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats { if reset { // Merging histogram may take some time. // Put all histograms aside and merge later. - toMerge := make([]*stats.Histogram, len(bc.lockingHistograms)) + toMerge := make([]*stats.Histogram, len(bc.lockingHistograms), len(bc.lockingHistograms)) for i := range bc.lockingHistograms { bc.lockingHistograms[i].mu.Lock() toMerge[i] = bc.lockingHistograms[i].histogram @@ -361,7 +361,7 @@ func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats { timeElapsed = time.Since(bc.lastResetTime).Seconds() } - b := make([]uint32, len(mergedHistogram.Buckets)) + b := make([]uint32, len(mergedHistogram.Buckets), len(mergedHistogram.Buckets)) for i, v := range mergedHistogram.Buckets { b[i] = uint32(v.Count) } From 71a11ef208454461a27f1232298f1e81b8461097 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 17 May 2016 10:59:28 -0700 Subject: [PATCH 16/17] Wrap for lockingHistogram.add swap mergeInto --- benchmark/worker/benchmark_client.go | 38 ++++++++++++++++++---------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go index b6d3c135..8419e042 100644 --- a/benchmark/worker/benchmark_client.go +++ b/benchmark/worker/benchmark_client.go @@ -58,6 +58,27 @@ type lockingHistogram struct { histogram *stats.Histogram } +func (h lockingHistogram) add(value int64) { + h.mu.Lock() + defer h.mu.Unlock() + h.histogram.Add(value) +} + +// swap sets h.histogram to new, and return its old value. +func (h lockingHistogram) swap(new *stats.Histogram) *stats.Histogram { + h.mu.Lock() + defer h.mu.Unlock() + old := h.histogram + h.histogram = new + return old +} + +func (h lockingHistogram) mergeInto(merged *stats.Histogram) { + h.mu.Lock() + defer h.mu.Unlock() + merged.Merge(h.histogram) +} + type benchmarkClient struct { closeConns func() stop chan bool @@ -254,9 +275,7 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe return } elapse := time.Since(start) - bc.lockingHistograms[idx].mu.Lock() - bc.lockingHistograms[idx].histogram.Add(int64(elapse)) - bc.lockingHistograms[idx].mu.Unlock() + bc.lockingHistograms[idx].add(int64(elapse)) select { case <-bc.stop: case done <- true: @@ -309,9 +328,7 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou return } elapse := time.Since(start) - bc.lockingHistograms[idx].mu.Lock() - bc.lockingHistograms[idx].histogram.Add(int64(elapse)) - bc.lockingHistograms[idx].mu.Unlock() + bc.lockingHistograms[idx].add(int64(elapse)) select { case <-bc.stop: case done <- true: @@ -339,10 +356,7 @@ func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats { // Put all histograms aside and merge later. toMerge := make([]*stats.Histogram, len(bc.lockingHistograms), len(bc.lockingHistograms)) for i := range bc.lockingHistograms { - bc.lockingHistograms[i].mu.Lock() - toMerge[i] = bc.lockingHistograms[i].histogram - bc.lockingHistograms[i].histogram = stats.NewHistogram(bc.histogramOptions) - bc.lockingHistograms[i].mu.Unlock() + toMerge[i] = bc.lockingHistograms[i].swap(stats.NewHistogram(bc.histogramOptions)) } for i := 0; i < len(toMerge); i++ { @@ -354,9 +368,7 @@ func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats { } else { // Merge only, not reset. for i := range bc.lockingHistograms { - bc.lockingHistograms[i].mu.Lock() - mergedHistogram.Merge(bc.lockingHistograms[i].histogram) - bc.lockingHistograms[i].mu.Unlock() + bc.lockingHistograms[i].mergeInto(mergedHistogram) } timeElapsed = time.Since(bc.lastResetTime).Seconds() } From 613465d739b2cec84abe3a8477ccd190f5557672 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 17 May 2016 11:08:03 -0700 Subject: [PATCH 17/17] Use pointer receiver for locking histogram --- benchmark/worker/benchmark_client.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go index 8419e042..a3d1e1f5 100644 --- a/benchmark/worker/benchmark_client.go +++ b/benchmark/worker/benchmark_client.go @@ -58,14 +58,14 @@ type lockingHistogram struct { histogram *stats.Histogram } -func (h lockingHistogram) add(value int64) { +func (h *lockingHistogram) add(value int64) { h.mu.Lock() defer h.mu.Unlock() h.histogram.Add(value) } -// swap sets h.histogram to new, and return its old value. -func (h lockingHistogram) swap(new *stats.Histogram) *stats.Histogram { +// swap sets h.histogram to new, and returns its old value. +func (h *lockingHistogram) swap(new *stats.Histogram) *stats.Histogram { h.mu.Lock() defer h.mu.Unlock() old := h.histogram @@ -73,7 +73,7 @@ func (h lockingHistogram) swap(new *stats.Histogram) *stats.Histogram { return old } -func (h lockingHistogram) mergeInto(merged *stats.Histogram) { +func (h *lockingHistogram) mergeInto(merged *stats.Histogram) { h.mu.Lock() defer h.mu.Unlock() merged.Merge(h.histogram) @@ -254,7 +254,7 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe client := testpb.NewBenchmarkServiceClient(conn) // For each connection, create rpcCountPerConn goroutines to do rpc. for j := 0; j < rpcCountPerConn; j++ { - // Create mutex and histogram for each goroutine. + // Create histogram for each goroutine. idx := ic*rpcCountPerConn + j bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions) // Start goroutine on the created mutex and histogram. @@ -307,7 +307,7 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou if err != nil { grpclog.Fatalf("%v.StreamingCall(_) = _, %v", c, err) } - // Create mutex and histogram for each goroutine. + // Create histogram for each goroutine. idx := ic*rpcCountPerConn + j bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions) // Start goroutine on the created mutex and histogram.