xds: fix RDS flaky tests due to ACK (#3279)
Clear ACK request before starting the real RDS tests. This should only be necessary for RDS tests because RDS needs an existing LDS.
This commit is contained in:
@ -160,19 +160,12 @@ func TestRDSGetClusterFromRouteConfiguration(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestRDSHandleResponse starts a fake xDS server, makes a ClientConn to it,
|
// doLDS makes a LDS watch, and waits for the response and ack to finish.
|
||||||
// and creates a v2Client using it. Then, it registers an LDS and RDS watcher
|
//
|
||||||
// and tests different RDS responses.
|
// This is called by RDS tests to start LDS first, because LDS is a
|
||||||
func TestRDSHandleResponse(t *testing.T) {
|
// pre-requirement for RDS, and RDS handle would fail without an existing LDS
|
||||||
fakeServer, sCleanup := fakexds.StartServer(t)
|
// watch.
|
||||||
client, cCleanup := fakeServer.GetClientConn(t)
|
func doLDS(t *testing.T, v2c *v2Client, fakeServer *fakexds.Server) {
|
||||||
defer func() {
|
|
||||||
cCleanup()
|
|
||||||
sCleanup()
|
|
||||||
}()
|
|
||||||
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
|
||||||
defer v2c.close()
|
|
||||||
|
|
||||||
// Register an LDS watcher, and wait till the request is sent out, the
|
// Register an LDS watcher, and wait till the request is sent out, the
|
||||||
// response is received and the callback is invoked.
|
// response is received and the callback is invoked.
|
||||||
cbCh := make(chan error, 1)
|
cbCh := make(chan error, 1)
|
||||||
@ -187,6 +180,21 @@ func TestRDSHandleResponse(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// Read the LDS ack, to clear RequestChan for following tests.
|
// Read the LDS ack, to clear RequestChan for following tests.
|
||||||
<-fakeServer.RequestChan
|
<-fakeServer.RequestChan
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestRDSHandleResponse starts a fake xDS server, makes a ClientConn to it,
|
||||||
|
// and creates a v2Client using it. Then, it registers an LDS and RDS watcher
|
||||||
|
// and tests different RDS responses.
|
||||||
|
func TestRDSHandleResponse(t *testing.T) {
|
||||||
|
fakeServer, sCleanup := fakexds.StartServer(t)
|
||||||
|
client, cCleanup := fakeServer.GetClientConn(t)
|
||||||
|
defer func() {
|
||||||
|
cCleanup()
|
||||||
|
sCleanup()
|
||||||
|
}()
|
||||||
|
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
||||||
|
defer v2c.close()
|
||||||
|
doLDS(t, v2c, fakeServer)
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
@ -283,19 +291,7 @@ func TestRDSHandleResponseWithoutRDSWatch(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
||||||
defer v2c.close()
|
defer v2c.close()
|
||||||
|
doLDS(t, v2c, fakeServer)
|
||||||
// Register an LDS watcher, and wait till the request is sent out, the
|
|
||||||
// response is received and the callback is invoked.
|
|
||||||
cbCh := make(chan error, 1)
|
|
||||||
v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) {
|
|
||||||
t.Logf("v2c.watchLDS callback, ldsUpdate: %+v, err: %v", u, err)
|
|
||||||
cbCh <- err
|
|
||||||
})
|
|
||||||
<-fakeServer.RequestChan
|
|
||||||
fakeServer.ResponseChan <- &fakexds.Response{Resp: goodLDSResponse1}
|
|
||||||
if err := <-cbCh; err != nil {
|
|
||||||
t.Fatalf("v2c.watchLDS returned error in callback: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if v2c.handleRDSResponse(goodRDSResponse1) == nil {
|
if v2c.handleRDSResponse(goodRDSResponse1) == nil {
|
||||||
t.Fatal("v2c.handleRDSResponse() succeeded, should have failed")
|
t.Fatal("v2c.handleRDSResponse() succeeded, should have failed")
|
||||||
@ -334,20 +330,7 @@ func testRDSCaching(t *testing.T, rdsTestOps []rdsTestOp, errCh chan error) {
|
|||||||
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
||||||
defer v2c.close()
|
defer v2c.close()
|
||||||
t.Log("Started xds v2Client...")
|
t.Log("Started xds v2Client...")
|
||||||
|
doLDS(t, v2c, fakeServer)
|
||||||
// Register an LDS watcher, and wait till the request is sent out, the
|
|
||||||
// response is received and the callback is invoked.
|
|
||||||
cbCh := make(chan error, 1)
|
|
||||||
v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) {
|
|
||||||
t.Logf("v2c.watchLDS callback, ldsUpdate: %+v, err: %v", u, err)
|
|
||||||
cbCh <- err
|
|
||||||
})
|
|
||||||
<-fakeServer.RequestChan
|
|
||||||
fakeServer.ResponseChan <- &fakexds.Response{Resp: goodLDSResponse1}
|
|
||||||
if err := <-cbCh; err != nil {
|
|
||||||
errCh <- fmt.Errorf("v2c.watchLDS returned error in callback: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
callbackCh := make(chan struct{}, 1)
|
callbackCh := make(chan struct{}, 1)
|
||||||
for _, rdsTestOp := range rdsTestOps {
|
for _, rdsTestOp := range rdsTestOps {
|
||||||
@ -471,20 +454,8 @@ func TestRDSWatchExpiryTimer(t *testing.T) {
|
|||||||
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
|
||||||
defer v2c.close()
|
defer v2c.close()
|
||||||
t.Log("Started xds v2Client...")
|
t.Log("Started xds v2Client...")
|
||||||
|
doLDS(t, v2c, fakeServer)
|
||||||
|
|
||||||
// Register an LDS watcher, and wait till the request is sent out, the
|
|
||||||
// response is received and the callback is invoked.
|
|
||||||
ldsCallbackCh := make(chan struct{})
|
|
||||||
v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) {
|
|
||||||
t.Logf("v2c.watchLDS callback, ldsUpdate: %+v, err: %v", u, err)
|
|
||||||
close(ldsCallbackCh)
|
|
||||||
})
|
|
||||||
<-fakeServer.RequestChan
|
|
||||||
fakeServer.ResponseChan <- &fakexds.Response{Resp: goodLDSResponse1}
|
|
||||||
<-ldsCallbackCh
|
|
||||||
|
|
||||||
// Wait till the request makes it to the fakeServer. This ensures that
|
|
||||||
// the watch request has been processed by the v2Client.
|
|
||||||
rdsCallbackCh := make(chan error, 1)
|
rdsCallbackCh := make(chan error, 1)
|
||||||
v2c.watchRDS(goodRouteName1, func(u rdsUpdate, err error) {
|
v2c.watchRDS(goodRouteName1, func(u rdsUpdate, err error) {
|
||||||
t.Logf("Received callback with rdsUpdate {%+v} and error {%v}", u, err)
|
t.Logf("Received callback with rdsUpdate {%+v} and error {%v}", u, err)
|
||||||
@ -496,9 +467,11 @@ func TestRDSWatchExpiryTimer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
rdsCallbackCh <- nil
|
rdsCallbackCh <- nil
|
||||||
})
|
})
|
||||||
|
// Wait till the request makes it to the fakeServer. This ensures that
|
||||||
|
// the watch request has been processed by the v2Client.
|
||||||
<-fakeServer.RequestChan
|
<-fakeServer.RequestChan
|
||||||
|
|
||||||
timer := time.NewTimer(2 * time.Second)
|
timer := time.NewTimer(2 * defaultWatchExpiryTimeout)
|
||||||
select {
|
select {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
t.Fatalf("Timeout expired when expecting RDS update")
|
t.Fatalf("Timeout expired when expecting RDS update")
|
||||||
|
Reference in New Issue
Block a user