xdsclient: deflake new transport ack/nack tests (#5830)

This commit is contained in:
Easwar Swaminathan
2022-12-01 10:25:30 -08:00
committed by GitHub
parent 99ba98231e
commit fa99649f0d

View File

@@ -79,18 +79,27 @@ var (
// verifies that an ACK is sent matching the version and nonce from the
// current response.
func (s) TestSimpleAckAndNack(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Create an xDS management server listening on a local port. Configure the
// request and response handlers to push on channels which are inspected by
// the test goroutine to verify ack version and nonce.
streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1)
streamResponseCh := make(chan *v3discoverypb.DiscoveryResponse, 1)
mgmtServer, err := e2e.StartManagementServer(&e2e.ManagementServerOptions{
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
streamRequestCh <- req
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
select {
case streamRequestCh <- req:
case <-ctx.Done():
}
return nil
},
OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) {
streamResponseCh <- resp
select {
case streamResponseCh <- resp:
case <-ctx.Done():
}
},
})
if err != nil {
@@ -119,8 +128,6 @@ func (s) TestSimpleAckAndNack(t *testing.T) {
Name: resourceName,
ApiListener: apiListener,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
nodeID := uuid.New().String()
mgmtServer.Update(ctx, e2e.UpdateOptions{
NodeID: nodeID,
@@ -258,20 +265,26 @@ func (s) TestSimpleAckAndNack(t *testing.T) {
// TestInvalidFirstResponse tests the case where the first response is invalid.
// The test verifies that the NACK contains an empty version string.
func (s) TestInvalidFirstResponse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Create an xDS management server listening on a local port. Configure the
// request and response handlers to push on channels which are inspected by
// the test goroutine to verify ack version and nonce.
streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1)
streamResponseCh := make(chan *v3discoverypb.DiscoveryResponse, 1)
mgmtServer, err := e2e.StartManagementServer(&e2e.ManagementServerOptions{
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
streamRequestCh <- req
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
select {
case streamRequestCh <- req:
case <-ctx.Done():
}
return nil
},
OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) {
select {
case streamResponseCh <- resp:
default:
case <-ctx.Done():
}
},
})
@@ -302,8 +315,6 @@ func (s) TestInvalidFirstResponse(t *testing.T) {
ApiListener: apiListener,
UseOriginalDst: &wrapperspb.BoolValue{Value: true}, // This will cause the resource to be NACKed.
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
nodeID := uuid.New().String()
mgmtServer.Update(ctx, e2e.UpdateOptions{
NodeID: nodeID,
@@ -384,18 +395,27 @@ func (s) TestInvalidFirstResponse(t *testing.T) {
// 3. The same resource is requested again. The test verifies that the request
// is sent with the previously ACKed version.
func (s) TestResourceIsNotRequestedAnymore(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Create an xDS management server listening on a local port. Configure the
// request and response handlers to push on channels which are inspected by
// the test goroutine to verify ack version and nonce.
streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1)
streamResponseCh := make(chan *v3discoverypb.DiscoveryResponse, 1)
mgmtServer, err := e2e.StartManagementServer(&e2e.ManagementServerOptions{
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
streamRequestCh <- req
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
select {
case streamRequestCh <- req:
case <-ctx.Done():
}
return nil
},
OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) {
streamResponseCh <- resp
select {
case streamResponseCh <- resp:
case <-ctx.Done():
}
},
})
if err != nil {
@@ -424,8 +444,6 @@ func (s) TestResourceIsNotRequestedAnymore(t *testing.T) {
Name: resourceName,
ApiListener: apiListener,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
nodeID := uuid.New().String()
mgmtServer.Update(ctx, e2e.UpdateOptions{
NodeID: nodeID,