diff --git a/xds/internal/client/client_callback.go b/xds/internal/client/client_callback.go index 3cd456ac..11dd6cca 100644 --- a/xds/internal/client/client_callback.go +++ b/xds/internal/client/client_callback.go @@ -24,6 +24,8 @@ type watcherInfoWithUpdate struct { err error } +// scheduleCallback should only be called by methods of watchInfo, which checks +// for watcher states and maintain consistency. func (c *Client) scheduleCallback(wi *watchInfo, update interface{}, err error) { c.updateCh.Put(&watcherInfoWithUpdate{ wi: wi, @@ -79,7 +81,7 @@ func (c *Client) newLDSUpdate(d map[string]ldsUpdate) { for name, update := range d { if s, ok := c.ldsWatchers[name]; ok { for wi := range s { - c.scheduleCallback(wi, update, nil) + wi.newUpdate(update) } // Sync cache. c.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, update) @@ -105,7 +107,7 @@ func (c *Client) newRDSUpdate(d map[string]rdsUpdate) { for name, update := range d { if s, ok := c.rdsWatchers[name]; ok { for wi := range s { - c.scheduleCallback(wi, update, nil) + wi.newUpdate(update) } // Sync cache. c.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, update) @@ -126,7 +128,7 @@ func (c *Client) newCDSUpdate(d map[string]ClusterUpdate) { for name, update := range d { if s, ok := c.cdsWatchers[name]; ok { for wi := range s { - c.scheduleCallback(wi, update, nil) + wi.newUpdate(update) } // Sync cache. c.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, update) @@ -152,7 +154,7 @@ func (c *Client) newEDSUpdate(d map[string]EndpointsUpdate) { for name, update := range d { if s, ok := c.edsWatchers[name]; ok { for wi := range s { - c.scheduleCallback(wi, update, nil) + wi.newUpdate(update) } // Sync cache. c.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, update) diff --git a/xds/internal/client/client_watchers.go b/xds/internal/client/client_watchers.go index bb1e15db..7a5bd261 100644 --- a/xds/internal/client/client_watchers.go +++ b/xds/internal/client/client_watchers.go @@ -19,6 +19,8 @@ package client import ( + "fmt" + "sync" "time" ) @@ -33,8 +35,18 @@ const ( edsURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment" ) +type watchInfoState int + +const ( + watchInfoStateStarted watchInfoState = iota + watchInfoStateRespReceived + watchInfoStateTimeout + watchInfoStateCanceled +) + // watchInfo holds all the information from a watch() call. type watchInfo struct { + c *Client typeURL string target string @@ -42,7 +54,63 @@ type watchInfo struct { rdsCallback rdsCallbackFunc cdsCallback func(ClusterUpdate, error) edsCallback func(EndpointsUpdate, error) + expiryTimer *time.Timer + + // mu protects state, and c.scheduleCallback(). + // - No callback should be scheduled after watchInfo is canceled. + // - No timeout error should be scheduled after watchInfo is resp received. + mu sync.Mutex + state watchInfoState +} + +func (wi *watchInfo) newUpdate(update interface{}) { + wi.mu.Lock() + defer wi.mu.Unlock() + if wi.state == watchInfoStateCanceled { + return + } + wi.state = watchInfoStateRespReceived + wi.expiryTimer.Stop() + wi.c.scheduleCallback(wi, update, nil) +} + +func (wi *watchInfo) timeout() { + wi.mu.Lock() + defer wi.mu.Unlock() + if wi.state == watchInfoStateCanceled || wi.state == watchInfoStateRespReceived { + return + } + wi.state = watchInfoStateTimeout + var ( + u interface{} + t string + ) + switch wi.typeURL { + case ldsURL: + u = ldsUpdate{} + t = "LDS" + case rdsURL: + u = rdsUpdate{} + t = "RDS" + case cdsURL: + u = ClusterUpdate{} + t = "CDS" + case edsURL: + u = EndpointsUpdate{} + t = "EDS" + } + wi.c.scheduleCallback(wi, u, fmt.Errorf("xds: %s target %s not found, watcher timeout", t, wi.target)) +} + +func (wi *watchInfo) cancel() { + wi.mu.Lock() + defer wi.mu.Unlock() + if wi.state == watchInfoStateCanceled { + return + } + wi.expiryTimer.Stop() + wi.state = watchInfoStateCanceled } func (c *Client) watch(wi *watchInfo) (cancel func()) { @@ -83,31 +151,31 @@ func (c *Client) watch(wi *watchInfo) (cancel func()) { case ldsURL: if v, ok := c.ldsCache[resourceName]; ok { c.logger.Debugf("LDS resource with name %v found in cache: %+v", wi.target, v) - c.scheduleCallback(wi, v, nil) + wi.newUpdate(v) } case rdsURL: if v, ok := c.rdsCache[resourceName]; ok { c.logger.Debugf("RDS resource with name %v found in cache: %+v", wi.target, v) - c.scheduleCallback(wi, v, nil) + wi.newUpdate(v) } case cdsURL: if v, ok := c.cdsCache[resourceName]; ok { c.logger.Debugf("CDS resource with name %v found in cache: %+v", wi.target, v) - c.scheduleCallback(wi, v, nil) + wi.newUpdate(v) } case edsURL: if v, ok := c.edsCache[resourceName]; ok { c.logger.Debugf("EDS resource with name %v found in cache: %+v", wi.target, v) - c.scheduleCallback(wi, v, nil) + wi.newUpdate(v) } } return func() { c.logger.Debugf("watch for type %v, resource name %v canceled", wi.typeURL, wi.target) + wi.cancel() c.mu.Lock() defer c.mu.Unlock() if s := watchers[resourceName]; s != nil { - wi.expiryTimer.Stop() // Remove this watcher, so it's callback will not be called in the // future. delete(s, wi) diff --git a/xds/internal/client/client_watchers_cluster.go b/xds/internal/client/client_watchers_cluster.go index 90d905f7..2d907113 100644 --- a/xds/internal/client/client_watchers_cluster.go +++ b/xds/internal/client/client_watchers_cluster.go @@ -19,7 +19,6 @@ package client import ( - "fmt" "time" ) @@ -44,13 +43,14 @@ type ClusterUpdate struct { // after the watcher is canceled. The caller needs to handle this case. func (c *Client) WatchCluster(clusterName string, cb func(ClusterUpdate, error)) (cancel func()) { wi := &watchInfo{ + c: c, typeURL: cdsURL, target: clusterName, cdsCallback: cb, } wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { - c.scheduleCallback(wi, ClusterUpdate{}, fmt.Errorf("xds: CDS target %s not found, watcher timeout", clusterName)) + wi.timeout() }) return c.watch(wi) } diff --git a/xds/internal/client/client_watchers_cluster_test.go b/xds/internal/client/client_watchers_cluster_test.go index 3621d8df..ab7a6209 100644 --- a/xds/internal/client/client_watchers_cluster_test.go +++ b/xds/internal/client/client_watchers_cluster_test.go @@ -44,6 +44,9 @@ func (s) TestClusterWatch(t *testing.T) { } defer c.Close() + // TODO: add a timeout to this recv. + // Note that this won't be necessary if we finish the TODO below to call + // Client directly instead of v2Client.r. v2Client := <-v2ClientCh clusterUpdateCh := testutils.NewChannel() @@ -274,3 +277,45 @@ func (s) TestClusterWatchExpiryTimer(t *testing.T) { t.Errorf("unexpected clusterError: , want error watcher timeout") } } + +// TestClusterWatchExpiryTimerStop tests the case where the client does receive +// an CDS response for the request that it sends out. We want no error even +// after expiry timeout. +func (s) TestClusterWatchExpiryTimerStop(t *testing.T) { + oldWatchExpiryTimeout := defaultWatchExpiryTimeout + defaultWatchExpiryTimeout = 500 * time.Millisecond + defer func() { + defaultWatchExpiryTimeout = oldWatchExpiryTimeout + }() + + v2ClientCh, cleanup := overrideNewXDSV2Client() + defer cleanup() + + c, err := New(clientOpts(testXDSServer)) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + defer c.Close() + + v2Client := <-v2ClientCh + + clusterUpdateCh := testutils.NewChannel() + c.WatchCluster(testCDSName, func(u ClusterUpdate, err error) { + clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err}) + }) + + wantUpdate := ClusterUpdate{ServiceName: testEDSName} + v2Client.r.newCDSUpdate(map[string]ClusterUpdate{ + testCDSName: wantUpdate, + }) + + if u, err := clusterUpdateCh.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) { + t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err) + } + + // Wait for an error, the error should never happen. + u, err := clusterUpdateCh.TimedReceive(defaultWatchExpiryTimeout * 2) + if err != testutils.ErrRecvTimeout { + t.Fatalf("got unexpected: %v, %v, want recv timeout", u.(clusterUpdateErr).u, u.(clusterUpdateErr).err) + } +} diff --git a/xds/internal/client/client_watchers_endpoints.go b/xds/internal/client/client_watchers_endpoints.go index db28865a..439f1824 100644 --- a/xds/internal/client/client_watchers_endpoints.go +++ b/xds/internal/client/client_watchers_endpoints.go @@ -19,7 +19,6 @@ package client import ( - "fmt" "time" "google.golang.org/grpc/xds/internal" @@ -81,13 +80,14 @@ type EndpointsUpdate struct { // after the watcher is canceled. The caller needs to handle this case. func (c *Client) WatchEndpoints(clusterName string, cb func(EndpointsUpdate, error)) (cancel func()) { wi := &watchInfo{ + c: c, typeURL: edsURL, target: clusterName, edsCallback: cb, } wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { - c.scheduleCallback(wi, EndpointsUpdate{}, fmt.Errorf("xds: EDS target %s not found, watcher timeout", clusterName)) + wi.timeout() }) return c.watch(wi) } diff --git a/xds/internal/client/client_watchers_lds.go b/xds/internal/client/client_watchers_lds.go index 9728cf80..0f7860db 100644 --- a/xds/internal/client/client_watchers_lds.go +++ b/xds/internal/client/client_watchers_lds.go @@ -19,7 +19,6 @@ package client import ( - "fmt" "time" ) @@ -35,13 +34,14 @@ type ldsCallbackFunc func(ldsUpdate, error) // after the watcher is canceled. The caller needs to handle this case. func (c *Client) watchLDS(serviceName string, cb ldsCallbackFunc) (cancel func()) { wi := &watchInfo{ + c: c, typeURL: ldsURL, target: serviceName, ldsCallback: cb, } wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { - c.scheduleCallback(wi, ldsUpdate{}, fmt.Errorf("xds: LDS target %s not found, watcher timeout", serviceName)) + wi.timeout() }) return c.watch(wi) } diff --git a/xds/internal/client/client_watchers_rds.go b/xds/internal/client/client_watchers_rds.go index e7f1cc32..b241a350 100644 --- a/xds/internal/client/client_watchers_rds.go +++ b/xds/internal/client/client_watchers_rds.go @@ -19,7 +19,6 @@ package client import ( - "fmt" "time" ) @@ -35,13 +34,14 @@ type rdsCallbackFunc func(rdsUpdate, error) // after the watcher is canceled. The caller needs to handle this case. func (c *Client) watchRDS(routeName string, cb rdsCallbackFunc) (cancel func()) { wi := &watchInfo{ + c: c, typeURL: rdsURL, target: routeName, rdsCallback: cb, } wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { - c.scheduleCallback(wi, rdsUpdate{}, fmt.Errorf("xds: RDS target %s not found, watcher timeout", routeName)) + wi.timeout() }) return c.watch(wi) }