xds: not cancel and restart RDS watch on the same LDS update (#3619)
This commit is contained in:
@ -62,6 +62,7 @@ type serviceUpdateWatcher struct {
|
||||
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
rdsName string
|
||||
rdsCancel func()
|
||||
}
|
||||
|
||||
@ -82,6 +83,12 @@ func (w *serviceUpdateWatcher) handleLDSResp(update ldsUpdate, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
if w.rdsName == update.routeName {
|
||||
// If the new routeName is same as the previous, don't cancel and
|
||||
// restart the RDS watch.
|
||||
return
|
||||
}
|
||||
w.rdsName = update.routeName
|
||||
if w.rdsCancel != nil {
|
||||
w.rdsCancel()
|
||||
}
|
||||
|
@ -337,3 +337,49 @@ func (s) TestServiceWatchWithClientClose(t *testing.T) {
|
||||
t.Fatal(cbErr)
|
||||
}
|
||||
}
|
||||
|
||||
// TestServiceNotCancelRDSOnSameLDSUpdate covers the case that if the second LDS
|
||||
// update contains the same RDS name as the previous, the RDS watch isn't
|
||||
// canceled and restarted.
|
||||
func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) {
|
||||
v2ClientCh, cleanup := overrideNewXDSV2Client()
|
||||
defer cleanup()
|
||||
|
||||
c, err := New(clientOpts(testXDSServer))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create client: %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
v2Client := <-v2ClientCh
|
||||
|
||||
serviceUpdateCh := testutils.NewChannel()
|
||||
c.WatchService(testLDSName, func(update ServiceUpdate, err error) {
|
||||
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
|
||||
})
|
||||
|
||||
wantUpdate := ServiceUpdate{Cluster: testCDSName}
|
||||
|
||||
<-v2Client.addWatches[ldsURL]
|
||||
v2Client.r.newLDSUpdate(map[string]ldsUpdate{
|
||||
testLDSName: {routeName: testRDSName},
|
||||
})
|
||||
<-v2Client.addWatches[rdsURL]
|
||||
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
|
||||
testRDSName: {clusterName: testCDSName},
|
||||
})
|
||||
|
||||
if u, err := serviceUpdateCh.Receive(); err != nil || u != (serviceUpdateErr{wantUpdate, nil}) {
|
||||
t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
|
||||
}
|
||||
|
||||
// Another LDS update with a the same RDS_name.
|
||||
v2Client.r.newLDSUpdate(map[string]ldsUpdate{
|
||||
testLDSName: {routeName: testRDSName},
|
||||
})
|
||||
select {
|
||||
case <-v2Client.removeWatches[rdsURL]:
|
||||
t.Fatalf("unexpected rds watch cancel")
|
||||
case <-time.After(time.Second):
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user