xds: stop expiry timer after receiving an update ()

This commit is contained in:
Menghan Li
2020-05-13 11:03:21 -07:00
committed by GitHub
parent a6ab4473c5
commit 36493bc375
7 changed files with 132 additions and 17 deletions

@ -24,6 +24,8 @@ type watcherInfoWithUpdate struct {
err error err error
} }
// scheduleCallback should only be called by methods of watchInfo, which checks
// for watcher states and maintain consistency.
func (c *Client) scheduleCallback(wi *watchInfo, update interface{}, err error) { func (c *Client) scheduleCallback(wi *watchInfo, update interface{}, err error) {
c.updateCh.Put(&watcherInfoWithUpdate{ c.updateCh.Put(&watcherInfoWithUpdate{
wi: wi, wi: wi,
@ -79,7 +81,7 @@ func (c *Client) newLDSUpdate(d map[string]ldsUpdate) {
for name, update := range d { for name, update := range d {
if s, ok := c.ldsWatchers[name]; ok { if s, ok := c.ldsWatchers[name]; ok {
for wi := range s { for wi := range s {
c.scheduleCallback(wi, update, nil) wi.newUpdate(update)
} }
// Sync cache. // Sync cache.
c.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, update) c.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, update)
@ -105,7 +107,7 @@ func (c *Client) newRDSUpdate(d map[string]rdsUpdate) {
for name, update := range d { for name, update := range d {
if s, ok := c.rdsWatchers[name]; ok { if s, ok := c.rdsWatchers[name]; ok {
for wi := range s { for wi := range s {
c.scheduleCallback(wi, update, nil) wi.newUpdate(update)
} }
// Sync cache. // Sync cache.
c.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, update) c.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, update)
@ -126,7 +128,7 @@ func (c *Client) newCDSUpdate(d map[string]ClusterUpdate) {
for name, update := range d { for name, update := range d {
if s, ok := c.cdsWatchers[name]; ok { if s, ok := c.cdsWatchers[name]; ok {
for wi := range s { for wi := range s {
c.scheduleCallback(wi, update, nil) wi.newUpdate(update)
} }
// Sync cache. // Sync cache.
c.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, update) c.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, update)
@ -152,7 +154,7 @@ func (c *Client) newEDSUpdate(d map[string]EndpointsUpdate) {
for name, update := range d { for name, update := range d {
if s, ok := c.edsWatchers[name]; ok { if s, ok := c.edsWatchers[name]; ok {
for wi := range s { for wi := range s {
c.scheduleCallback(wi, update, nil) wi.newUpdate(update)
} }
// Sync cache. // Sync cache.
c.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, update) c.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, update)

@ -19,6 +19,8 @@
package client package client
import ( import (
"fmt"
"sync"
"time" "time"
) )
@ -33,8 +35,18 @@ const (
edsURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment" edsURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
) )
type watchInfoState int
const (
watchInfoStateStarted watchInfoState = iota
watchInfoStateRespReceived
watchInfoStateTimeout
watchInfoStateCanceled
)
// watchInfo holds all the information from a watch() call. // watchInfo holds all the information from a watch() call.
type watchInfo struct { type watchInfo struct {
c *Client
typeURL string typeURL string
target string target string
@ -42,7 +54,63 @@ type watchInfo struct {
rdsCallback rdsCallbackFunc rdsCallback rdsCallbackFunc
cdsCallback func(ClusterUpdate, error) cdsCallback func(ClusterUpdate, error)
edsCallback func(EndpointsUpdate, error) edsCallback func(EndpointsUpdate, error)
expiryTimer *time.Timer expiryTimer *time.Timer
// mu protects state, and c.scheduleCallback().
// - No callback should be scheduled after watchInfo is canceled.
// - No timeout error should be scheduled after watchInfo is resp received.
mu sync.Mutex
state watchInfoState
}
func (wi *watchInfo) newUpdate(update interface{}) {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.state = watchInfoStateRespReceived
wi.expiryTimer.Stop()
wi.c.scheduleCallback(wi, update, nil)
}
func (wi *watchInfo) timeout() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled || wi.state == watchInfoStateRespReceived {
return
}
wi.state = watchInfoStateTimeout
var (
u interface{}
t string
)
switch wi.typeURL {
case ldsURL:
u = ldsUpdate{}
t = "LDS"
case rdsURL:
u = rdsUpdate{}
t = "RDS"
case cdsURL:
u = ClusterUpdate{}
t = "CDS"
case edsURL:
u = EndpointsUpdate{}
t = "EDS"
}
wi.c.scheduleCallback(wi, u, fmt.Errorf("xds: %s target %s not found, watcher timeout", t, wi.target))
}
func (wi *watchInfo) cancel() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.expiryTimer.Stop()
wi.state = watchInfoStateCanceled
} }
func (c *Client) watch(wi *watchInfo) (cancel func()) { func (c *Client) watch(wi *watchInfo) (cancel func()) {
@ -83,31 +151,31 @@ func (c *Client) watch(wi *watchInfo) (cancel func()) {
case ldsURL: case ldsURL:
if v, ok := c.ldsCache[resourceName]; ok { if v, ok := c.ldsCache[resourceName]; ok {
c.logger.Debugf("LDS resource with name %v found in cache: %+v", wi.target, v) c.logger.Debugf("LDS resource with name %v found in cache: %+v", wi.target, v)
c.scheduleCallback(wi, v, nil) wi.newUpdate(v)
} }
case rdsURL: case rdsURL:
if v, ok := c.rdsCache[resourceName]; ok { if v, ok := c.rdsCache[resourceName]; ok {
c.logger.Debugf("RDS resource with name %v found in cache: %+v", wi.target, v) c.logger.Debugf("RDS resource with name %v found in cache: %+v", wi.target, v)
c.scheduleCallback(wi, v, nil) wi.newUpdate(v)
} }
case cdsURL: case cdsURL:
if v, ok := c.cdsCache[resourceName]; ok { if v, ok := c.cdsCache[resourceName]; ok {
c.logger.Debugf("CDS resource with name %v found in cache: %+v", wi.target, v) c.logger.Debugf("CDS resource with name %v found in cache: %+v", wi.target, v)
c.scheduleCallback(wi, v, nil) wi.newUpdate(v)
} }
case edsURL: case edsURL:
if v, ok := c.edsCache[resourceName]; ok { if v, ok := c.edsCache[resourceName]; ok {
c.logger.Debugf("EDS resource with name %v found in cache: %+v", wi.target, v) c.logger.Debugf("EDS resource with name %v found in cache: %+v", wi.target, v)
c.scheduleCallback(wi, v, nil) wi.newUpdate(v)
} }
} }
return func() { return func() {
c.logger.Debugf("watch for type %v, resource name %v canceled", wi.typeURL, wi.target) c.logger.Debugf("watch for type %v, resource name %v canceled", wi.typeURL, wi.target)
wi.cancel()
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
if s := watchers[resourceName]; s != nil { if s := watchers[resourceName]; s != nil {
wi.expiryTimer.Stop()
// Remove this watcher, so it's callback will not be called in the // Remove this watcher, so it's callback will not be called in the
// future. // future.
delete(s, wi) delete(s, wi)

@ -19,7 +19,6 @@
package client package client
import ( import (
"fmt"
"time" "time"
) )
@ -44,13 +43,14 @@ type ClusterUpdate struct {
// after the watcher is canceled. The caller needs to handle this case. // after the watcher is canceled. The caller needs to handle this case.
func (c *Client) WatchCluster(clusterName string, cb func(ClusterUpdate, error)) (cancel func()) { func (c *Client) WatchCluster(clusterName string, cb func(ClusterUpdate, error)) (cancel func()) {
wi := &watchInfo{ wi := &watchInfo{
c: c,
typeURL: cdsURL, typeURL: cdsURL,
target: clusterName, target: clusterName,
cdsCallback: cb, cdsCallback: cb,
} }
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
c.scheduleCallback(wi, ClusterUpdate{}, fmt.Errorf("xds: CDS target %s not found, watcher timeout", clusterName)) wi.timeout()
}) })
return c.watch(wi) return c.watch(wi)
} }

@ -44,6 +44,9 @@ func (s) TestClusterWatch(t *testing.T) {
} }
defer c.Close() defer c.Close()
// TODO: add a timeout to this recv.
// Note that this won't be necessary if we finish the TODO below to call
// Client directly instead of v2Client.r.
v2Client := <-v2ClientCh v2Client := <-v2ClientCh
clusterUpdateCh := testutils.NewChannel() clusterUpdateCh := testutils.NewChannel()
@ -274,3 +277,45 @@ func (s) TestClusterWatchExpiryTimer(t *testing.T) {
t.Errorf("unexpected clusterError: <nil>, want error watcher timeout") t.Errorf("unexpected clusterError: <nil>, want error watcher timeout")
} }
} }
// TestClusterWatchExpiryTimerStop tests the case where the client does receive
// an CDS response for the request that it sends out. We want no error even
// after expiry timeout.
func (s) TestClusterWatchExpiryTimerStop(t *testing.T) {
oldWatchExpiryTimeout := defaultWatchExpiryTimeout
defaultWatchExpiryTimeout = 500 * time.Millisecond
defer func() {
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
}()
v2ClientCh, cleanup := overrideNewXDSV2Client()
defer cleanup()
c, err := New(clientOpts(testXDSServer))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
defer c.Close()
v2Client := <-v2ClientCh
clusterUpdateCh := testutils.NewChannel()
c.WatchCluster(testCDSName, func(u ClusterUpdate, err error) {
clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err})
})
wantUpdate := ClusterUpdate{ServiceName: testEDSName}
v2Client.r.newCDSUpdate(map[string]ClusterUpdate{
testCDSName: wantUpdate,
})
if u, err := clusterUpdateCh.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) {
t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
}
// Wait for an error, the error should never happen.
u, err := clusterUpdateCh.TimedReceive(defaultWatchExpiryTimeout * 2)
if err != testutils.ErrRecvTimeout {
t.Fatalf("got unexpected: %v, %v, want recv timeout", u.(clusterUpdateErr).u, u.(clusterUpdateErr).err)
}
}

@ -19,7 +19,6 @@
package client package client
import ( import (
"fmt"
"time" "time"
"google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal"
@ -81,13 +80,14 @@ type EndpointsUpdate struct {
// after the watcher is canceled. The caller needs to handle this case. // after the watcher is canceled. The caller needs to handle this case.
func (c *Client) WatchEndpoints(clusterName string, cb func(EndpointsUpdate, error)) (cancel func()) { func (c *Client) WatchEndpoints(clusterName string, cb func(EndpointsUpdate, error)) (cancel func()) {
wi := &watchInfo{ wi := &watchInfo{
c: c,
typeURL: edsURL, typeURL: edsURL,
target: clusterName, target: clusterName,
edsCallback: cb, edsCallback: cb,
} }
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
c.scheduleCallback(wi, EndpointsUpdate{}, fmt.Errorf("xds: EDS target %s not found, watcher timeout", clusterName)) wi.timeout()
}) })
return c.watch(wi) return c.watch(wi)
} }

@ -19,7 +19,6 @@
package client package client
import ( import (
"fmt"
"time" "time"
) )
@ -35,13 +34,14 @@ type ldsCallbackFunc func(ldsUpdate, error)
// after the watcher is canceled. The caller needs to handle this case. // after the watcher is canceled. The caller needs to handle this case.
func (c *Client) watchLDS(serviceName string, cb ldsCallbackFunc) (cancel func()) { func (c *Client) watchLDS(serviceName string, cb ldsCallbackFunc) (cancel func()) {
wi := &watchInfo{ wi := &watchInfo{
c: c,
typeURL: ldsURL, typeURL: ldsURL,
target: serviceName, target: serviceName,
ldsCallback: cb, ldsCallback: cb,
} }
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
c.scheduleCallback(wi, ldsUpdate{}, fmt.Errorf("xds: LDS target %s not found, watcher timeout", serviceName)) wi.timeout()
}) })
return c.watch(wi) return c.watch(wi)
} }

@ -19,7 +19,6 @@
package client package client
import ( import (
"fmt"
"time" "time"
) )
@ -35,13 +34,14 @@ type rdsCallbackFunc func(rdsUpdate, error)
// after the watcher is canceled. The caller needs to handle this case. // after the watcher is canceled. The caller needs to handle this case.
func (c *Client) watchRDS(routeName string, cb rdsCallbackFunc) (cancel func()) { func (c *Client) watchRDS(routeName string, cb rdsCallbackFunc) (cancel func()) {
wi := &watchInfo{ wi := &watchInfo{
c: c,
typeURL: rdsURL, typeURL: rdsURL,
target: routeName, target: routeName,
rdsCallback: cb, rdsCallback: cb,
} }
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
c.scheduleCallback(wi, rdsUpdate{}, fmt.Errorf("xds: RDS target %s not found, watcher timeout", routeName)) wi.timeout()
}) })
return c.watch(wi) return c.watch(wi)
} }