rls: Implementation of the adaptive client-side throttler. (#3346)

This commit is contained in:
Easwar Swaminathan
2020-02-05 03:32:31 -08:00
committed by GitHub
parent 648cf9b00e
commit 13535f71d1
4 changed files with 539 additions and 0 deletions

View File

@ -0,0 +1,132 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package adaptive provides functionality for adaptive client-side throttling.
package adaptive
import (
"sync"
"time"
"google.golang.org/grpc/internal/grpcrand"
)
// For overriding in unittests.
var (
timeNowFunc = func() time.Time { return time.Now() }
randFunc = func() float64 { return grpcrand.Float64() }
)
const (
defaultDuration = 30 * time.Second
defaultBins = 100
defaultRatioForAccepts = 2.0
defaultRequestsPadding = 8.0
)
// Throttler implements a client-side throttling recommendation system. All
// methods are safe for concurrent use by multiple goroutines.
//
// The throttler has the following knobs for which we will use defaults for
// now. If there is a need to make them configurable at a later point in time,
// support for the same will be added.
// * Duration: amount of recent history that will be taken into account for
// making client-side throttling decisions. A default of 30 seconds is used.
// * Bins: number of bins to be used for bucketing historical data. A default
// of 100 is used.
// * RatioForAccepts: ratio by which accepts are multiplied, typically a value
// slightly larger than 1.0. This is used to make the throttler behave as if
// the backend had accepted more requests than it actually has, which lets us
// err on the side of sending to the backend more requests than we think it
// will accept for the sake of speeding up the propagation of state. A
// default of 2.0 is used.
// * RequestsPadding: is used to decrease the (client-side) throttling
// probability in the low QPS regime (to speed up propagation of state), as
// well as to safeguard against hitting a client-side throttling probability
// of 100%. The weight of this value decreases as the number of requests in
// recent history grows. A default of 8 is used.
//
// The adaptive throttler attempts to estimate the probability that a request
// will be throttled using recent history. Server requests (both throttled and
// accepted) are registered with the throttler (via the RegisterBackendResponse
// method), which then recommends client-side throttling (via the
// ShouldThrottle method) with probability given by:
// (requests - RatioForAccepts * accepts) / (requests + RequestsPadding)
type Throttler struct {
ratioForAccepts float64
requestsPadding float64
// Number of total accepts and throttles in the lookback period.
mu sync.Mutex
accepts *lookback
throttles *lookback
}
// New initializes a new adaptive throttler with the default values.
func New() *Throttler {
return newWithArgs(defaultDuration, defaultBins, defaultRatioForAccepts, defaultRequestsPadding)
}
// newWithArgs initializes a new adaptive throttler with the provided values.
// Used only in unittests.
func newWithArgs(duration time.Duration, bins int64, ratioForAccepts, requestsPadding float64) *Throttler {
return &Throttler{
ratioForAccepts: ratioForAccepts,
requestsPadding: requestsPadding,
accepts: newLookback(bins, duration),
throttles: newLookback(bins, duration),
}
}
// ShouldThrottle returns a probabilistic estimate of whether the server would
// throttle the next request. This should be called for every request before
// allowing it to hit the network. If the returned value is true, the request
// should be aborted immediately (as if it had been throttled by the server).
func (t *Throttler) ShouldThrottle() bool {
randomProbability := randFunc()
now := timeNowFunc()
t.mu.Lock()
defer t.mu.Unlock()
accepts, throttles := float64(t.accepts.sum(now)), float64(t.throttles.sum(now))
requests := accepts + throttles
throttleProbability := (requests - t.ratioForAccepts*accepts) / (requests + t.requestsPadding)
if throttleProbability <= randomProbability {
return false
}
t.throttles.add(now, 1)
return true
}
// RegisterBackendResponse registers a response received from the backend for a
// request allowed by ShouldThrottle. This should be called for every response
// received from the backend (i.e., once for each request for which
// ShouldThrottle returned false).
func (t *Throttler) RegisterBackendResponse(throttled bool) {
now := timeNowFunc()
t.mu.Lock()
if throttled {
t.throttles.add(now, 1)
} else {
t.accepts.add(now, 1)
}
t.mu.Unlock()
}

View File

@ -0,0 +1,231 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package adaptive
import (
"sync"
"testing"
"time"
)
// stats returns a tuple with accepts, throttles for the current time.
func (th *Throttler) stats() (int64, int64) {
now := timeNowFunc()
th.mu.Lock()
a, t := th.accepts.sum(now), th.throttles.sum(now)
th.mu.Unlock()
return a, t
}
// Enums for responses.
const (
E = iota // No response
A // Accepted
T // Throttled
)
func TestRegisterBackendResponse(t *testing.T) {
testcases := []struct {
desc string
bins int64
ticks []int64
responses []int64
wantAccepts []int64
wantThrottled []int64
}{
{
"Accumulate",
3,
[]int64{0, 1, 2}, // Ticks
[]int64{A, T, E}, // Responses
[]int64{1, 1, 1}, // Accepts
[]int64{0, 1, 1}, // Throttled
},
{
"LightTimeTravel",
3,
[]int64{1, 0, 2}, // Ticks
[]int64{A, T, E}, // Response
[]int64{1, 1, 1}, // Accepts
[]int64{0, 1, 1}, // Throttled
},
{
"HeavyTimeTravel",
3,
[]int64{8, 0, 9}, // Ticks
[]int64{A, A, A}, // Response
[]int64{1, 1, 2}, // Accepts
[]int64{0, 0, 0}, // Throttled
},
{
"Rollover",
1,
[]int64{0, 1, 2}, // Ticks
[]int64{A, T, E}, // Responses
[]int64{1, 0, 0}, // Accepts
[]int64{0, 1, 0}, // Throttled
},
}
m := mockClock{}
oldTimeNowFunc := timeNowFunc
timeNowFunc = m.Now
defer func() { timeNowFunc = oldTimeNowFunc }()
for _, test := range testcases {
t.Run(test.desc, func(t *testing.T) {
th := newWithArgs(time.Duration(test.bins), test.bins, 2.0, 8)
for i, tick := range test.ticks {
m.SetNanos(tick)
if test.responses[i] != E {
th.RegisterBackendResponse(test.responses[i] == T)
}
if gotAccepts, gotThrottled := th.stats(); gotAccepts != test.wantAccepts[i] || gotThrottled != test.wantThrottled[i] {
t.Errorf("th.stats() = {%d, %d} for index %d, want {%d, %d}", i, gotAccepts, gotThrottled, test.wantAccepts[i], test.wantThrottled[i])
}
}
})
}
}
func TestShouldThrottleOptions(t *testing.T) {
// ShouldThrottle should return true iff
// (requests - RatioForAccepts * accepts) / (requests + RequestsPadding) <= p
// where p is a random number. For the purposes of this test it's fixed
// to 0.5.
responses := []int64{T, T, T, T, T, T, T, T, T, A, A, A, A, A, A, T, T, T, T}
n := false
y := true
testcases := []struct {
desc string
ratioForAccepts float64
requestsPadding float64
want []bool
}{
{
"Baseline",
1.1,
8,
[]bool{n, n, n, n, n, n, n, n, y, y, y, y, y, n, n, n, y, y, y},
},
{
"ChangePadding",
1.1,
7,
[]bool{n, n, n, n, n, n, n, y, y, y, y, y, y, y, y, y, y, y, y},
},
{
"ChangeRatioForAccepts",
1.4,
8,
[]bool{n, n, n, n, n, n, n, n, y, y, n, n, n, n, n, n, n, n, n},
},
}
m := mockClock{}
oldTimeNowFunc := timeNowFunc
timeNowFunc = m.Now
oldRandFunc := randFunc
randFunc = func() float64 { return 0.5 }
defer func() {
timeNowFunc = oldTimeNowFunc
randFunc = oldRandFunc
}()
for _, test := range testcases {
t.Run(test.desc, func(t *testing.T) {
m.SetNanos(0)
th := newWithArgs(time.Duration(time.Nanosecond), 1, test.ratioForAccepts, test.requestsPadding)
for i, response := range responses {
if response != E {
th.RegisterBackendResponse(response == T)
}
if got := th.ShouldThrottle(); got != test.want[i] {
t.Errorf("ShouldThrottle for index %d: got %v, want %v", i, got, test.want[i])
}
}
})
}
}
func TestParallel(t *testing.T) {
// Uses all the defaults which comes with a 30 second duration.
th := New()
testDuration := 2 * time.Second
numRoutines := 10
accepts := make([]int64, numRoutines)
throttles := make([]int64, numRoutines)
var wg sync.WaitGroup
for i := 0; i < numRoutines; i++ {
wg.Add(1)
go func(num int) {
defer wg.Done()
ticker := time.NewTicker(testDuration)
var accept int64
var throttle int64
for i := 0; ; i++ {
select {
case <-ticker.C:
ticker.Stop()
accepts[num] = accept
throttles[num] = throttle
return
default:
if i%2 == 0 {
th.RegisterBackendResponse(true)
throttle++
} else {
th.RegisterBackendResponse(false)
accept++
}
}
}
}(i)
}
wg.Wait()
var wantAccepts, wantThrottles int64
for i := 0; i < numRoutines; i++ {
wantAccepts += accepts[i]
wantThrottles += throttles[i]
}
if gotAccepts, gotThrottles := th.stats(); gotAccepts != wantAccepts || gotThrottles != wantThrottles {
t.Errorf("th.stats() = {%d, %d}, want {%d, %d}", gotAccepts, gotThrottles, wantAccepts, wantThrottles)
}
}
type mockClock struct {
t time.Time
}
func (m *mockClock) Now() time.Time {
return m.t
}
func (m *mockClock) SetNanos(n int64) {
m.t = time.Unix(0, n)
}

View File

@ -0,0 +1,91 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package adaptive
import "time"
// lookback implements a moving sum over an int64 timeline.
type lookback struct {
bins int64 // Number of bins to use for lookback.
width time.Duration // Width of each bin.
head int64 // Absolute bin index (time * bins / duration) of the current head bin.
total int64 // Sum over all the values in buf, within the lookback window behind head.
buf []int64 // Ring buffer for keeping track of the sum elements.
}
// newLookback creates a new lookback for the given duration with a set number
// of bins.
func newLookback(bins int64, duration time.Duration) *lookback {
return &lookback{
bins: bins,
width: duration / time.Duration(bins),
buf: make([]int64, bins),
}
}
// add is used to increment the lookback sum.
func (l *lookback) add(t time.Time, v int64) {
pos := l.advance(t)
if (l.head - pos) >= l.bins {
// Do not increment counters if pos is more than bins behind head.
return
}
l.buf[pos%l.bins] += v
l.total += v
}
// sum returns the sum of the lookback buffer at the given time or head,
// whichever is greater.
func (l *lookback) sum(t time.Time) int64 {
l.advance(t)
return l.total
}
// advance prepares the lookback buffer for calls to add() or sum() at time t.
// If head is greater than t then the lookback buffer will be untouched. The
// absolute bin index corresponding to t is returned. It will always be less
// than or equal to head.
func (l *lookback) advance(t time.Time) int64 {
ch := l.head // Current head bin index.
nh := t.UnixNano() / l.width.Nanoseconds() // New head bin index.
if nh <= ch {
// Either head unchanged or clock jitter (time has moved backwards). Do
// not advance.
return nh
}
jmax := min(l.bins, nh-ch)
for j := int64(0); j < jmax; j++ {
i := (ch + j + 1) % l.bins
l.total -= l.buf[i]
l.buf[i] = 0
}
l.head = nh
return nh
}
func min(x int64, y int64) int64 {
if x < y {
return x
}
return y
}

View File

@ -0,0 +1,85 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package adaptive
import (
"testing"
"time"
)
func TestLookback(t *testing.T) {
makeTicks := func(offsets []int64) []time.Time {
var ticks []time.Time
now := time.Now()
for _, offset := range offsets {
ticks = append(ticks, now.Add(time.Duration(offset)))
}
return ticks
}
// lookback.add and lookback.sum behave correctly.
testcases := []struct {
desc string
bins int64
ticks []time.Time
values []int64
want []int64
}{
{
"Accumulate",
3,
makeTicks([]int64{0, 1, 2}), // Ticks
[]int64{1, 2, 3}, // Values
[]int64{1, 3, 6}, // Want
},
{
"LightTimeTravel",
3,
makeTicks([]int64{1, 0, 2}), // Ticks
[]int64{1, 2, 3}, // Values
[]int64{1, 3, 6}, // Want
},
{
"HeavyTimeTravel",
3,
makeTicks([]int64{8, 0, 9}), // Ticks
[]int64{1, 2, 3}, // Values
[]int64{1, 1, 4}, // Want
},
{
"Rollover",
1,
makeTicks([]int64{0, 1, 2}), // Ticks
[]int64{1, 2, 3}, // Values
[]int64{1, 2, 3}, // Want
},
}
for _, test := range testcases {
t.Run(test.desc, func(t *testing.T) {
lb := newLookback(test.bins, time.Duration(test.bins))
for i, tick := range test.ticks {
lb.add(tick, test.values[i])
if got := lb.sum(tick); got != test.want[i] {
t.Errorf("sum for index %d got %d, want %d", i, got, test.want[i])
}
}
})
}
}