xds: cache locality (sub-balancer) (#3067)
When a locality is removed from EDS response, it's corresponding sub-balancer will be removed from balancer group. With this change, the sub-balancer won't be removed immediately. It will be kept in a cache (for 15 minutes by default). If the locality is re-added within the timeout, the sub-balancer in cache will be picked and re-used.
This commit is contained in:
131
internal/cache/timeoutCache.go
vendored
Normal file
131
internal/cache/timeoutCache.go
vendored
Normal file
@ -0,0 +1,131 @@
|
||||
/*
|
||||
* Copyright 2019 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
// Package cache implements caches to be used in gRPC.
|
||||
package cache
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type cacheEntry struct {
|
||||
item interface{}
|
||||
callback func()
|
||||
timer *time.Timer
|
||||
// deleted is set to true in Remove() when the call to timer.Stop() fails.
|
||||
// This can happen when the timer in the cache entry fires around the same
|
||||
// time that timer.stop() is called in Remove().
|
||||
deleted bool
|
||||
}
|
||||
|
||||
// TimeoutCache is a cache with items to be deleted after a timeout.
|
||||
type TimeoutCache struct {
|
||||
mu sync.Mutex
|
||||
timeout time.Duration
|
||||
cache map[interface{}]*cacheEntry
|
||||
}
|
||||
|
||||
// NewTimeoutCache creates a TimeoutCache with the given timeout.
|
||||
func NewTimeoutCache(timeout time.Duration) *TimeoutCache {
|
||||
return &TimeoutCache{
|
||||
timeout: timeout,
|
||||
cache: make(map[interface{}]*cacheEntry),
|
||||
}
|
||||
}
|
||||
|
||||
// Add adds an item to the cache, with the specified callback to be called when
|
||||
// the item is removed from the cache upon timeout. If the item is removed from
|
||||
// the cache using a call to Remove before the timeout expires, the callback
|
||||
// will not be called.
|
||||
//
|
||||
// If the Add was successful, it returns (newly added item, true). If there is
|
||||
// an existing entry for the specified key, the cache entry is not be updated
|
||||
// with the specified item and it returns (existing item, false).
|
||||
func (c *TimeoutCache) Add(key, item interface{}, callback func()) (interface{}, bool) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if e, ok := c.cache[key]; ok {
|
||||
return e.item, false
|
||||
}
|
||||
|
||||
entry := &cacheEntry{
|
||||
item: item,
|
||||
callback: callback,
|
||||
}
|
||||
entry.timer = time.AfterFunc(c.timeout, func() {
|
||||
c.mu.Lock()
|
||||
if entry.deleted {
|
||||
c.mu.Unlock()
|
||||
// Abort the delete since this has been taken care of in Remove().
|
||||
return
|
||||
}
|
||||
delete(c.cache, key)
|
||||
c.mu.Unlock()
|
||||
entry.callback()
|
||||
})
|
||||
c.cache[key] = entry
|
||||
return item, true
|
||||
}
|
||||
|
||||
// Remove the item with the key from the cache.
|
||||
//
|
||||
// If the specified key exists in the cache, it returns (item associated with
|
||||
// key, true) and the callback associated with the item is guaranteed to be not
|
||||
// called. If the given key is not found in the cache, it returns (nil, false)
|
||||
func (c *TimeoutCache) Remove(key interface{}) (item interface{}, ok bool) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
entry, ok := c.removeInternal(key, false)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return entry.item, true
|
||||
}
|
||||
|
||||
// removeInternal removes and returns the item with key.
|
||||
//
|
||||
// caller must hold c.mu.
|
||||
func (c *TimeoutCache) removeInternal(key interface{}, runCallback bool) (*cacheEntry, bool) {
|
||||
entry, ok := c.cache[key]
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
delete(c.cache, key)
|
||||
if !entry.timer.Stop() {
|
||||
// If stop was not successful, the timer has fired (this can only happen
|
||||
// in a race). But the deleting function is blocked on c.mu because the
|
||||
// mutex was held by the caller of this function.
|
||||
//
|
||||
// Set deleted to true to abort the deleting function. When the lock is
|
||||
// released, the delete function will acquire the lock, check the value
|
||||
// of deleted and return.
|
||||
entry.deleted = true
|
||||
}
|
||||
if runCallback {
|
||||
entry.callback()
|
||||
}
|
||||
return entry, true
|
||||
}
|
||||
|
||||
// Clear removes all entries, and runs the callbacks if runCallback is true.
|
||||
func (c *TimeoutCache) Clear(runCallback bool) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
for key := range c.cache {
|
||||
c.removeInternal(key, runCallback)
|
||||
}
|
||||
}
|
220
internal/cache/timeoutCache_test.go
vendored
Normal file
220
internal/cache/timeoutCache_test.go
vendored
Normal file
@ -0,0 +1,220 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2019 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
testCacheTimeout = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
func (c *TimeoutCache) getForTesting(key interface{}) (*cacheEntry, bool) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
r, ok := c.cache[key]
|
||||
return r, ok
|
||||
}
|
||||
|
||||
// TestCacheExpire attempts to add an entry to the cache and verifies that it
|
||||
// was added successfully. It then makes sure that on timeout, it's removed and
|
||||
// the associated callback is called.
|
||||
func TestCacheExpire(t *testing.T) {
|
||||
const k, v = 1, "1"
|
||||
c := NewTimeoutCache(testCacheTimeout)
|
||||
|
||||
callbackChan := make(chan struct{})
|
||||
c.Add(k, v, func() { close(callbackChan) })
|
||||
|
||||
if gotV, ok := c.getForTesting(k); !ok || gotV.item != v {
|
||||
t.Fatalf("After Add(), before timeout, from cache got: %v, %v, want %v, %v", gotV.item, ok, v, true)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-callbackChan:
|
||||
case <-time.After(testCacheTimeout * 2):
|
||||
t.Fatalf("timeout waiting for callback")
|
||||
}
|
||||
|
||||
if _, ok := c.getForTesting(k); ok {
|
||||
t.Fatalf("After Add(), after timeout, from cache got: _, %v, want _, %v", ok, false)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCacheRemove attempts to remove an existing entry from the cache and
|
||||
// verifies that the entry is removed and the associated callback is not
|
||||
// invoked.
|
||||
func TestCacheRemove(t *testing.T) {
|
||||
const k, v = 1, "1"
|
||||
c := NewTimeoutCache(testCacheTimeout)
|
||||
|
||||
callbackChan := make(chan struct{})
|
||||
c.Add(k, v, func() { close(callbackChan) })
|
||||
|
||||
if got, ok := c.getForTesting(k); !ok || got.item != v {
|
||||
t.Fatalf("After Add(), before timeout, from cache got: %v, %v, want %v, %v", got.item, ok, v, true)
|
||||
}
|
||||
|
||||
time.Sleep(testCacheTimeout / 2)
|
||||
|
||||
gotV, gotOK := c.Remove(k)
|
||||
if !gotOK || gotV != v {
|
||||
t.Fatalf("After Add(), before timeout, Remove() got: %v, %v, want %v, %v", gotV, gotOK, v, true)
|
||||
}
|
||||
|
||||
if _, ok := c.getForTesting(k); ok {
|
||||
t.Fatalf("After Add(), before timeout, after Remove(), from cache got: _, %v, want _, %v", ok, false)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-callbackChan:
|
||||
t.Fatalf("unexpected callback after retrieve")
|
||||
case <-time.After(testCacheTimeout * 2):
|
||||
}
|
||||
}
|
||||
|
||||
// TestCacheClearWithoutCallback attempts to clear all entries from the cache
|
||||
// and verifies that the associated callbacks are not invoked.
|
||||
func TestCacheClearWithoutCallback(t *testing.T) {
|
||||
var values []string
|
||||
const itemCount = 3
|
||||
for i := 0; i < itemCount; i++ {
|
||||
values = append(values, strconv.Itoa(i))
|
||||
}
|
||||
c := NewTimeoutCache(testCacheTimeout)
|
||||
|
||||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
callbackChan := make(chan struct{}, itemCount)
|
||||
|
||||
for i, v := range values {
|
||||
callbackChanTemp := make(chan struct{})
|
||||
c.Add(i, v, func() { close(callbackChanTemp) })
|
||||
go func() {
|
||||
select {
|
||||
case <-callbackChanTemp:
|
||||
callbackChan <- struct{}{}
|
||||
case <-done:
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
for i, v := range values {
|
||||
if got, ok := c.getForTesting(i); !ok || got.item != v {
|
||||
t.Fatalf("After Add(), before timeout, from cache got: %v, %v, want %v, %v", got.item, ok, v, true)
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(testCacheTimeout / 2)
|
||||
c.Clear(false)
|
||||
|
||||
for i := range values {
|
||||
if _, ok := c.getForTesting(i); ok {
|
||||
t.Fatalf("After Add(), before timeout, after Remove(), from cache got: _, %v, want _, %v", ok, false)
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-callbackChan:
|
||||
t.Fatalf("unexpected callback after Clear")
|
||||
case <-time.After(testCacheTimeout * 2):
|
||||
}
|
||||
}
|
||||
|
||||
// TestCacheClearWithCallback attempts to clear all entries from the cache and
|
||||
// verifies that the associated callbacks are invoked.
|
||||
func TestCacheClearWithCallback(t *testing.T) {
|
||||
var values []string
|
||||
const itemCount = 3
|
||||
for i := 0; i < itemCount; i++ {
|
||||
values = append(values, strconv.Itoa(i))
|
||||
}
|
||||
c := NewTimeoutCache(time.Hour)
|
||||
|
||||
testDone := make(chan struct{})
|
||||
defer close(testDone)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(itemCount)
|
||||
for i, v := range values {
|
||||
callbackChanTemp := make(chan struct{})
|
||||
c.Add(i, v, func() { close(callbackChanTemp) })
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
select {
|
||||
case <-callbackChanTemp:
|
||||
case <-testDone:
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
allGoroutineDone := make(chan struct{}, itemCount)
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(allGoroutineDone)
|
||||
}()
|
||||
|
||||
for i, v := range values {
|
||||
if got, ok := c.getForTesting(i); !ok || got.item != v {
|
||||
t.Fatalf("After Add(), before timeout, from cache got: %v, %v, want %v, %v", got.item, ok, v, true)
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(testCacheTimeout / 2)
|
||||
c.Clear(true)
|
||||
|
||||
for i := range values {
|
||||
if _, ok := c.getForTesting(i); ok {
|
||||
t.Fatalf("After Add(), before timeout, after Remove(), from cache got: _, %v, want _, %v", ok, false)
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-allGoroutineDone:
|
||||
case <-time.After(testCacheTimeout * 2):
|
||||
t.Fatalf("timeout waiting for all callbacks")
|
||||
}
|
||||
}
|
||||
|
||||
// TestCacheRetrieveTimeoutRace simulates the case where an entry's timer fires
|
||||
// around the same time that Remove() is called for it. It verifies that there
|
||||
// is no deadlock.
|
||||
func TestCacheRetrieveTimeoutRace(t *testing.T) {
|
||||
c := NewTimeoutCache(time.Nanosecond)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for i := 0; i < 1000; i++ {
|
||||
// Add starts a timer with 1 ns timeout, then remove will race
|
||||
// with the timer.
|
||||
c.Add(i, strconv.Itoa(i), func() {})
|
||||
c.Remove(i)
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("Test didn't finish within 1 second. Deadlock")
|
||||
case <-done:
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user