diff --git a/xds/internal/xdsclient/transport/transport_ack_nack_test.go b/xds/internal/xdsclient/transport/transport_ack_nack_test.go index b9eeb02e..3d574090 100644 --- a/xds/internal/xdsclient/transport/transport_ack_nack_test.go +++ b/xds/internal/xdsclient/transport/transport_ack_nack_test.go @@ -79,18 +79,27 @@ var ( // verifies that an ACK is sent matching the version and nonce from the // current response. func (s) TestSimpleAckAndNack(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // Create an xDS management server listening on a local port. Configure the // request and response handlers to push on channels which are inspected by // the test goroutine to verify ack version and nonce. streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) streamResponseCh := make(chan *v3discoverypb.DiscoveryResponse, 1) mgmtServer, err := e2e.StartManagementServer(&e2e.ManagementServerOptions{ - OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { - streamRequestCh <- req + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + select { + case streamRequestCh <- req: + case <-ctx.Done(): + } return nil }, OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { - streamResponseCh <- resp + select { + case streamResponseCh <- resp: + case <-ctx.Done(): + } }, }) if err != nil { @@ -119,8 +128,6 @@ func (s) TestSimpleAckAndNack(t *testing.T) { Name: resourceName, ApiListener: apiListener, } - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() nodeID := uuid.New().String() mgmtServer.Update(ctx, e2e.UpdateOptions{ NodeID: nodeID, @@ -258,20 +265,26 @@ func (s) TestSimpleAckAndNack(t *testing.T) { // TestInvalidFirstResponse tests the case where the first response is invalid. // The test verifies that the NACK contains an empty version string. func (s) TestInvalidFirstResponse(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // Create an xDS management server listening on a local port. Configure the // request and response handlers to push on channels which are inspected by // the test goroutine to verify ack version and nonce. streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) streamResponseCh := make(chan *v3discoverypb.DiscoveryResponse, 1) mgmtServer, err := e2e.StartManagementServer(&e2e.ManagementServerOptions{ - OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { - streamRequestCh <- req + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + select { + case streamRequestCh <- req: + case <-ctx.Done(): + } return nil }, OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { select { case streamResponseCh <- resp: - default: + case <-ctx.Done(): } }, }) @@ -302,8 +315,6 @@ func (s) TestInvalidFirstResponse(t *testing.T) { ApiListener: apiListener, UseOriginalDst: &wrapperspb.BoolValue{Value: true}, // This will cause the resource to be NACKed. } - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() nodeID := uuid.New().String() mgmtServer.Update(ctx, e2e.UpdateOptions{ NodeID: nodeID, @@ -384,18 +395,27 @@ func (s) TestInvalidFirstResponse(t *testing.T) { // 3. The same resource is requested again. The test verifies that the request // is sent with the previously ACKed version. func (s) TestResourceIsNotRequestedAnymore(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // Create an xDS management server listening on a local port. Configure the // request and response handlers to push on channels which are inspected by // the test goroutine to verify ack version and nonce. streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) streamResponseCh := make(chan *v3discoverypb.DiscoveryResponse, 1) mgmtServer, err := e2e.StartManagementServer(&e2e.ManagementServerOptions{ - OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { - streamRequestCh <- req + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + select { + case streamRequestCh <- req: + case <-ctx.Done(): + } return nil }, OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { - streamResponseCh <- resp + select { + case streamResponseCh <- resp: + case <-ctx.Done(): + } }, }) if err != nil { @@ -424,8 +444,6 @@ func (s) TestResourceIsNotRequestedAnymore(t *testing.T) { Name: resourceName, ApiListener: apiListener, } - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() nodeID := uuid.New().String() mgmtServer.Update(ctx, e2e.UpdateOptions{ NodeID: nodeID,