From 13535f71d1e916dde66ddca327009a5d9398b52a Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 5 Feb 2020 03:32:31 -0800 Subject: [PATCH] rls: Implementation of the adaptive client-side throttler. (#3346) --- balancer/rls/internal/adaptive/adaptive.go | 132 ++++++++++ .../rls/internal/adaptive/adaptive_test.go | 231 ++++++++++++++++++ balancer/rls/internal/adaptive/lookback.go | 91 +++++++ .../rls/internal/adaptive/lookback_test.go | 85 +++++++ 4 files changed, 539 insertions(+) create mode 100644 balancer/rls/internal/adaptive/adaptive.go create mode 100644 balancer/rls/internal/adaptive/adaptive_test.go create mode 100644 balancer/rls/internal/adaptive/lookback.go create mode 100644 balancer/rls/internal/adaptive/lookback_test.go diff --git a/balancer/rls/internal/adaptive/adaptive.go b/balancer/rls/internal/adaptive/adaptive.go new file mode 100644 index 00000000..4adae1bd --- /dev/null +++ b/balancer/rls/internal/adaptive/adaptive.go @@ -0,0 +1,132 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package adaptive provides functionality for adaptive client-side throttling. +package adaptive + +import ( + "sync" + "time" + + "google.golang.org/grpc/internal/grpcrand" +) + +// For overriding in unittests. +var ( + timeNowFunc = func() time.Time { return time.Now() } + randFunc = func() float64 { return grpcrand.Float64() } +) + +const ( + defaultDuration = 30 * time.Second + defaultBins = 100 + defaultRatioForAccepts = 2.0 + defaultRequestsPadding = 8.0 +) + +// Throttler implements a client-side throttling recommendation system. All +// methods are safe for concurrent use by multiple goroutines. +// +// The throttler has the following knobs for which we will use defaults for +// now. If there is a need to make them configurable at a later point in time, +// support for the same will be added. +// * Duration: amount of recent history that will be taken into account for +// making client-side throttling decisions. A default of 30 seconds is used. +// * Bins: number of bins to be used for bucketing historical data. A default +// of 100 is used. +// * RatioForAccepts: ratio by which accepts are multiplied, typically a value +// slightly larger than 1.0. This is used to make the throttler behave as if +// the backend had accepted more requests than it actually has, which lets us +// err on the side of sending to the backend more requests than we think it +// will accept for the sake of speeding up the propagation of state. A +// default of 2.0 is used. +// * RequestsPadding: is used to decrease the (client-side) throttling +// probability in the low QPS regime (to speed up propagation of state), as +// well as to safeguard against hitting a client-side throttling probability +// of 100%. The weight of this value decreases as the number of requests in +// recent history grows. A default of 8 is used. +// +// The adaptive throttler attempts to estimate the probability that a request +// will be throttled using recent history. Server requests (both throttled and +// accepted) are registered with the throttler (via the RegisterBackendResponse +// method), which then recommends client-side throttling (via the +// ShouldThrottle method) with probability given by: +// (requests - RatioForAccepts * accepts) / (requests + RequestsPadding) +type Throttler struct { + ratioForAccepts float64 + requestsPadding float64 + + // Number of total accepts and throttles in the lookback period. + mu sync.Mutex + accepts *lookback + throttles *lookback +} + +// New initializes a new adaptive throttler with the default values. +func New() *Throttler { + return newWithArgs(defaultDuration, defaultBins, defaultRatioForAccepts, defaultRequestsPadding) +} + +// newWithArgs initializes a new adaptive throttler with the provided values. +// Used only in unittests. +func newWithArgs(duration time.Duration, bins int64, ratioForAccepts, requestsPadding float64) *Throttler { + return &Throttler{ + ratioForAccepts: ratioForAccepts, + requestsPadding: requestsPadding, + accepts: newLookback(bins, duration), + throttles: newLookback(bins, duration), + } +} + +// ShouldThrottle returns a probabilistic estimate of whether the server would +// throttle the next request. This should be called for every request before +// allowing it to hit the network. If the returned value is true, the request +// should be aborted immediately (as if it had been throttled by the server). +func (t *Throttler) ShouldThrottle() bool { + randomProbability := randFunc() + now := timeNowFunc() + + t.mu.Lock() + defer t.mu.Unlock() + + accepts, throttles := float64(t.accepts.sum(now)), float64(t.throttles.sum(now)) + requests := accepts + throttles + throttleProbability := (requests - t.ratioForAccepts*accepts) / (requests + t.requestsPadding) + if throttleProbability <= randomProbability { + return false + } + + t.throttles.add(now, 1) + return true +} + +// RegisterBackendResponse registers a response received from the backend for a +// request allowed by ShouldThrottle. This should be called for every response +// received from the backend (i.e., once for each request for which +// ShouldThrottle returned false). +func (t *Throttler) RegisterBackendResponse(throttled bool) { + now := timeNowFunc() + + t.mu.Lock() + if throttled { + t.throttles.add(now, 1) + } else { + t.accepts.add(now, 1) + } + t.mu.Unlock() +} diff --git a/balancer/rls/internal/adaptive/adaptive_test.go b/balancer/rls/internal/adaptive/adaptive_test.go new file mode 100644 index 00000000..40a846a3 --- /dev/null +++ b/balancer/rls/internal/adaptive/adaptive_test.go @@ -0,0 +1,231 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package adaptive + +import ( + "sync" + "testing" + "time" +) + +// stats returns a tuple with accepts, throttles for the current time. +func (th *Throttler) stats() (int64, int64) { + now := timeNowFunc() + + th.mu.Lock() + a, t := th.accepts.sum(now), th.throttles.sum(now) + th.mu.Unlock() + return a, t +} + +// Enums for responses. +const ( + E = iota // No response + A // Accepted + T // Throttled +) + +func TestRegisterBackendResponse(t *testing.T) { + testcases := []struct { + desc string + bins int64 + ticks []int64 + responses []int64 + wantAccepts []int64 + wantThrottled []int64 + }{ + { + "Accumulate", + 3, + []int64{0, 1, 2}, // Ticks + []int64{A, T, E}, // Responses + []int64{1, 1, 1}, // Accepts + []int64{0, 1, 1}, // Throttled + }, + { + "LightTimeTravel", + 3, + []int64{1, 0, 2}, // Ticks + []int64{A, T, E}, // Response + []int64{1, 1, 1}, // Accepts + []int64{0, 1, 1}, // Throttled + }, + { + "HeavyTimeTravel", + 3, + []int64{8, 0, 9}, // Ticks + []int64{A, A, A}, // Response + []int64{1, 1, 2}, // Accepts + []int64{0, 0, 0}, // Throttled + }, + { + "Rollover", + 1, + []int64{0, 1, 2}, // Ticks + []int64{A, T, E}, // Responses + []int64{1, 0, 0}, // Accepts + []int64{0, 1, 0}, // Throttled + }, + } + + m := mockClock{} + oldTimeNowFunc := timeNowFunc + timeNowFunc = m.Now + defer func() { timeNowFunc = oldTimeNowFunc }() + + for _, test := range testcases { + t.Run(test.desc, func(t *testing.T) { + th := newWithArgs(time.Duration(test.bins), test.bins, 2.0, 8) + for i, tick := range test.ticks { + m.SetNanos(tick) + + if test.responses[i] != E { + th.RegisterBackendResponse(test.responses[i] == T) + } + + if gotAccepts, gotThrottled := th.stats(); gotAccepts != test.wantAccepts[i] || gotThrottled != test.wantThrottled[i] { + t.Errorf("th.stats() = {%d, %d} for index %d, want {%d, %d}", i, gotAccepts, gotThrottled, test.wantAccepts[i], test.wantThrottled[i]) + } + } + }) + } +} + +func TestShouldThrottleOptions(t *testing.T) { + // ShouldThrottle should return true iff + // (requests - RatioForAccepts * accepts) / (requests + RequestsPadding) <= p + // where p is a random number. For the purposes of this test it's fixed + // to 0.5. + responses := []int64{T, T, T, T, T, T, T, T, T, A, A, A, A, A, A, T, T, T, T} + + n := false + y := true + + testcases := []struct { + desc string + ratioForAccepts float64 + requestsPadding float64 + want []bool + }{ + { + "Baseline", + 1.1, + 8, + []bool{n, n, n, n, n, n, n, n, y, y, y, y, y, n, n, n, y, y, y}, + }, + { + "ChangePadding", + 1.1, + 7, + []bool{n, n, n, n, n, n, n, y, y, y, y, y, y, y, y, y, y, y, y}, + }, + { + "ChangeRatioForAccepts", + 1.4, + 8, + []bool{n, n, n, n, n, n, n, n, y, y, n, n, n, n, n, n, n, n, n}, + }, + } + + m := mockClock{} + oldTimeNowFunc := timeNowFunc + timeNowFunc = m.Now + oldRandFunc := randFunc + randFunc = func() float64 { return 0.5 } + defer func() { + timeNowFunc = oldTimeNowFunc + randFunc = oldRandFunc + }() + + for _, test := range testcases { + t.Run(test.desc, func(t *testing.T) { + m.SetNanos(0) + th := newWithArgs(time.Duration(time.Nanosecond), 1, test.ratioForAccepts, test.requestsPadding) + for i, response := range responses { + if response != E { + th.RegisterBackendResponse(response == T) + } + if got := th.ShouldThrottle(); got != test.want[i] { + t.Errorf("ShouldThrottle for index %d: got %v, want %v", i, got, test.want[i]) + } + } + }) + } +} + +func TestParallel(t *testing.T) { + // Uses all the defaults which comes with a 30 second duration. + th := New() + + testDuration := 2 * time.Second + numRoutines := 10 + accepts := make([]int64, numRoutines) + throttles := make([]int64, numRoutines) + var wg sync.WaitGroup + for i := 0; i < numRoutines; i++ { + wg.Add(1) + go func(num int) { + defer wg.Done() + + ticker := time.NewTicker(testDuration) + var accept int64 + var throttle int64 + for i := 0; ; i++ { + select { + case <-ticker.C: + ticker.Stop() + accepts[num] = accept + throttles[num] = throttle + return + default: + if i%2 == 0 { + th.RegisterBackendResponse(true) + throttle++ + } else { + th.RegisterBackendResponse(false) + accept++ + } + } + } + }(i) + } + wg.Wait() + + var wantAccepts, wantThrottles int64 + for i := 0; i < numRoutines; i++ { + wantAccepts += accepts[i] + wantThrottles += throttles[i] + } + + if gotAccepts, gotThrottles := th.stats(); gotAccepts != wantAccepts || gotThrottles != wantThrottles { + t.Errorf("th.stats() = {%d, %d}, want {%d, %d}", gotAccepts, gotThrottles, wantAccepts, wantThrottles) + } +} + +type mockClock struct { + t time.Time +} + +func (m *mockClock) Now() time.Time { + return m.t +} + +func (m *mockClock) SetNanos(n int64) { + m.t = time.Unix(0, n) +} diff --git a/balancer/rls/internal/adaptive/lookback.go b/balancer/rls/internal/adaptive/lookback.go new file mode 100644 index 00000000..13b316b7 --- /dev/null +++ b/balancer/rls/internal/adaptive/lookback.go @@ -0,0 +1,91 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package adaptive + +import "time" + +// lookback implements a moving sum over an int64 timeline. +type lookback struct { + bins int64 // Number of bins to use for lookback. + width time.Duration // Width of each bin. + + head int64 // Absolute bin index (time * bins / duration) of the current head bin. + total int64 // Sum over all the values in buf, within the lookback window behind head. + buf []int64 // Ring buffer for keeping track of the sum elements. +} + +// newLookback creates a new lookback for the given duration with a set number +// of bins. +func newLookback(bins int64, duration time.Duration) *lookback { + return &lookback{ + bins: bins, + width: duration / time.Duration(bins), + buf: make([]int64, bins), + } +} + +// add is used to increment the lookback sum. +func (l *lookback) add(t time.Time, v int64) { + pos := l.advance(t) + + if (l.head - pos) >= l.bins { + // Do not increment counters if pos is more than bins behind head. + return + } + l.buf[pos%l.bins] += v + l.total += v +} + +// sum returns the sum of the lookback buffer at the given time or head, +// whichever is greater. +func (l *lookback) sum(t time.Time) int64 { + l.advance(t) + return l.total +} + +// advance prepares the lookback buffer for calls to add() or sum() at time t. +// If head is greater than t then the lookback buffer will be untouched. The +// absolute bin index corresponding to t is returned. It will always be less +// than or equal to head. +func (l *lookback) advance(t time.Time) int64 { + ch := l.head // Current head bin index. + nh := t.UnixNano() / l.width.Nanoseconds() // New head bin index. + + if nh <= ch { + // Either head unchanged or clock jitter (time has moved backwards). Do + // not advance. + return nh + } + + jmax := min(l.bins, nh-ch) + for j := int64(0); j < jmax; j++ { + i := (ch + j + 1) % l.bins + l.total -= l.buf[i] + l.buf[i] = 0 + } + l.head = nh + return nh +} + +func min(x int64, y int64) int64 { + if x < y { + return x + } + return y +} diff --git a/balancer/rls/internal/adaptive/lookback_test.go b/balancer/rls/internal/adaptive/lookback_test.go new file mode 100644 index 00000000..3e441f51 --- /dev/null +++ b/balancer/rls/internal/adaptive/lookback_test.go @@ -0,0 +1,85 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package adaptive + +import ( + "testing" + "time" +) + +func TestLookback(t *testing.T) { + makeTicks := func(offsets []int64) []time.Time { + var ticks []time.Time + now := time.Now() + for _, offset := range offsets { + ticks = append(ticks, now.Add(time.Duration(offset))) + } + return ticks + } + + // lookback.add and lookback.sum behave correctly. + testcases := []struct { + desc string + bins int64 + ticks []time.Time + values []int64 + want []int64 + }{ + { + "Accumulate", + 3, + makeTicks([]int64{0, 1, 2}), // Ticks + []int64{1, 2, 3}, // Values + []int64{1, 3, 6}, // Want + }, + { + "LightTimeTravel", + 3, + makeTicks([]int64{1, 0, 2}), // Ticks + []int64{1, 2, 3}, // Values + []int64{1, 3, 6}, // Want + }, + { + "HeavyTimeTravel", + 3, + makeTicks([]int64{8, 0, 9}), // Ticks + []int64{1, 2, 3}, // Values + []int64{1, 1, 4}, // Want + }, + { + "Rollover", + 1, + makeTicks([]int64{0, 1, 2}), // Ticks + []int64{1, 2, 3}, // Values + []int64{1, 2, 3}, // Want + }, + } + + for _, test := range testcases { + t.Run(test.desc, func(t *testing.T) { + lb := newLookback(test.bins, time.Duration(test.bins)) + for i, tick := range test.ticks { + lb.add(tick, test.values[i]) + if got := lb.sum(tick); got != test.want[i] { + t.Errorf("sum for index %d got %d, want %d", i, got, test.want[i]) + } + } + }) + } +}