From 11399004a9fb27372616637994650e18ca5e1ecb Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 10 Dec 2019 11:20:06 -0800 Subject: [PATCH] xds: resolver test cleanup (#3233) This PR also adds a new testutils directory with a fake client and a channel which supports a timed receive operation. In follow up PRs, we can move other common test stuff like the fake server etc to this directory and cleanup more tests. --- xds/internal/resolver/xds_resolver_test.go | 185 ++++++++------------- xds/internal/testutils/channel.go | 70 ++++++++ xds/internal/testutils/xdsclient.go | 67 ++++++++ 3 files changed, 207 insertions(+), 115 deletions(-) create mode 100644 xds/internal/testutils/channel.go create mode 100644 xds/internal/testutils/xdsclient.go diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 9e3bc322..7a7b40c1 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -24,13 +24,16 @@ import ( "fmt" "net" "testing" - "time" "google.golang.org/grpc" + "google.golang.org/grpc/internal" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" + xdsinternal "google.golang.org/grpc/xds/internal" + _ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // To parse LB config xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/bootstrap" + "google.golang.org/grpc/xds/internal/testutils" corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" ) @@ -55,62 +58,26 @@ var ( // event through a channel. type testClientConn struct { resolver.ClientConn - stateCh chan struct{} - errorCh chan struct{} - gotState resolver.State + stateCh *testutils.Channel + errorCh *testutils.Channel } func (t *testClientConn) UpdateState(s resolver.State) { - t.gotState = s - t.stateCh <- struct{}{} + t.stateCh.Send(s) } func (t *testClientConn) ReportError(err error) { - t.errorCh <- struct{}{} + t.errorCh.Send(err) } -func (*testClientConn) ParseServiceConfig(_ string) *serviceconfig.ParseResult { - return &serviceconfig.ParseResult{} - // TODO: Uncomment this once we have a "experimental_cds" balancer. - // return internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(jsonSC) +func (t *testClientConn) ParseServiceConfig(jsonSC string) *serviceconfig.ParseResult { + return internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(jsonSC) } func newTestClientConn() *testClientConn { return &testClientConn{ - stateCh: make(chan struct{}, 1), - errorCh: make(chan struct{}, 1), - } -} - -// fakeXDSClient is a fake implementation of the xdsClientInterface interface. -// All it does is to store the parameters received in the watch call and -// signals various events by sending on channels. -type fakeXDSClient struct { - gotTarget string - gotCallback func(xdsclient.ServiceUpdate, error) - suCh chan struct{} - cancel chan struct{} - done chan struct{} -} - -func (f *fakeXDSClient) WatchService(target string, callback func(xdsclient.ServiceUpdate, error)) func() { - f.gotTarget = target - f.gotCallback = callback - f.suCh <- struct{}{} - return func() { - f.cancel <- struct{}{} - } -} - -func (f *fakeXDSClient) Close() { - f.done <- struct{}{} -} - -func newFakeXDSClient() *fakeXDSClient { - return &fakeXDSClient{ - suCh: make(chan struct{}, 1), - cancel: make(chan struct{}, 1), - done: make(chan struct{}, 1), + stateCh: testutils.NewChannel(), + errorCh: testutils.NewChannel(), } } @@ -130,7 +97,7 @@ func getXDSClientMakerFunc(wantOpts xdsclient.Options) func(xdsclient.Options) ( if len(gotOpts.DialOpts) != len(wantOpts.DialOpts) { return nil, fmt.Errorf("got len(DialOpts): %v, want: %v", len(gotOpts.DialOpts), len(wantOpts.DialOpts)) } - return newFakeXDSClient(), nil + return testutils.NewXDSClient(), nil } } @@ -237,6 +204,8 @@ type setupOpts struct { } func testSetup(t *testing.T, opts setupOpts) (*xdsResolver, *testClientConn, func()) { + t.Helper() + oldConfigMaker := newXDSConfig newXDSConfig = func() *bootstrap.Config { return opts.config } oldClientMaker := newXDSClient @@ -259,107 +228,93 @@ func testSetup(t *testing.T, opts setupOpts) (*xdsResolver, *testClientConn, fun return r.(*xdsResolver), tcc, cancel } +// waitForWatchService waits for the WatchService method to be called on the +// xdsClient within a reasonable amount of time, and also verifies that the +// watch is called with the expected target. +func waitForWatchService(t *testing.T, xdsC *testutils.XDSClient, wantTarget string) { + t.Helper() + + gotTarget, err := xdsC.WaitForWatchService() + if err != nil { + t.Fatalf("xdsClient.WatchService failed with error: %v", err) + } + if gotTarget != wantTarget { + t.Fatalf("xdsClient.WatchService() called with target: %v, want %v", gotTarget, wantTarget) + } +} + // TestXDSResolverWatchCallbackAfterClose tests the case where a service update // from the underlying xdsClient is received after the resolver is closed. func TestXDSResolverWatchCallbackAfterClose(t *testing.T) { - fakeXDSClient := newFakeXDSClient() + xdsC := testutils.NewXDSClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ config: &validConfig, - xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return fakeXDSClient, nil }, + xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return xdsC, nil }, }) defer cancel() - // Wait for the WatchService method to be called on the xdsClient. - <-fakeXDSClient.suCh - if fakeXDSClient.gotTarget != targetStr { - t.Fatalf("xdsClient.WatchService() called with target: %v, want %v", fakeXDSClient.gotTarget, targetStr) - } + waitForWatchService(t, xdsC, targetStr) - // Call the watchAPI callback after closing the resolver. + // Call the watchAPI callback after closing the resolver, and make sure no + // update is triggerred on the ClientConn. xdsR.Close() - fakeXDSClient.gotCallback(xdsclient.ServiceUpdate{Cluster: cluster}, nil) - - timer := time.NewTimer(1 * time.Second) - select { - case <-timer.C: - case <-tcc.stateCh: - timer.Stop() - t.Fatalf("ClientConn.UpdateState called after xdsResolver is closed") + xdsC.InvokeWatchServiceCb(cluster, nil) + if gotVal, gotErr := tcc.stateCh.Receive(); gotErr != testutils.ErrRecvTimeout { + t.Fatalf("ClientConn.UpdateState called after xdsResolver is closed: %v", gotVal) } } // TestXDSResolverBadServiceUpdate tests the case the xdsClient returns a bad // service update. func TestXDSResolverBadServiceUpdate(t *testing.T) { - fakeXDSClient := newFakeXDSClient() + xdsC := testutils.NewXDSClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ config: &validConfig, - xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return fakeXDSClient, nil }, + xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return xdsC, nil }, }) - defer cancel() - defer xdsR.Close() + defer func() { + cancel() + xdsR.Close() + }() - // Wait for the WatchService method to be called on the xdsClient. - timer := time.NewTimer(1 * time.Second) - select { - case <-timer.C: - t.Fatal("Timeout when waiting for WatchService to be called on xdsClient") - case <-fakeXDSClient.suCh: - if !timer.Stop() { - <-timer.C - } - } - if fakeXDSClient.gotTarget != targetStr { - t.Fatalf("xdsClient.WatchService() called with target: %v, want %v", fakeXDSClient.gotTarget, targetStr) - } + waitForWatchService(t, xdsC, targetStr) - // Invoke the watchAPI callback with a bad service update. - fakeXDSClient.gotCallback(xdsclient.ServiceUpdate{Cluster: ""}, errors.New("bad serviceupdate")) - - // Wait for the ReportError method to be called on the ClientConn. - timer = time.NewTimer(1 * time.Second) - select { - case <-timer.C: - t.Fatal("Timeout when waiting for UpdateState to be called on the ClientConn") - case <-tcc.errorCh: - timer.Stop() + // Invoke the watchAPI callback with a bad service update and wait for the + // ReportError method to be called on the ClientConn. + suErr := errors.New("bad serviceupdate") + xdsC.InvokeWatchServiceCb("", suErr) + if gotErrVal, gotErr := tcc.errorCh.Receive(); gotErr != nil || gotErrVal != suErr { + t.Fatalf("ClientConn.ReportError() received %v, want %v", gotErrVal, suErr) } } // TestXDSResolverGoodServiceUpdate tests the happy case where the resolver // gets a good service update from the xdsClient. func TestXDSResolverGoodServiceUpdate(t *testing.T) { - fakeXDSClient := newFakeXDSClient() + xdsC := testutils.NewXDSClient() xdsR, tcc, cancel := testSetup(t, setupOpts{ config: &validConfig, - xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return fakeXDSClient, nil }, + xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return xdsC, nil }, }) - defer cancel() - defer xdsR.Close() + defer func() { + cancel() + xdsR.Close() + }() - // Wait for the WatchService method to be called on the xdsClient. - timer := time.NewTimer(1 * time.Second) - select { - case <-timer.C: - t.Fatal("Timeout when waiting for WatchService to be called on xdsClient") - case <-fakeXDSClient.suCh: - if !timer.Stop() { - <-timer.C - } + waitForWatchService(t, xdsC, targetStr) + + // Invoke the watchAPI callback with a good service update and wait for the + // UpdateState method to be called on the ClientConn. + xdsC.InvokeWatchServiceCb(cluster, nil) + gotState, err := tcc.stateCh.Receive() + if err != nil { + t.Fatalf("ClientConn.UpdateState returned error: %v", err) } - if fakeXDSClient.gotTarget != targetStr { - t.Fatalf("xdsClient.WatchService() called with target: %v, want %v", fakeXDSClient.gotTarget, targetStr) + rState := gotState.(resolver.State) + if gotClient := rState.Attributes.Value(xdsinternal.XDSClientID); gotClient != xdsC { + t.Fatalf("ClientConn.UpdateState got xdsClient: %v, want %v", gotClient, xdsC) } - - // Invoke the watchAPI callback with a good service update. - fakeXDSClient.gotCallback(xdsclient.ServiceUpdate{Cluster: cluster}, nil) - - // Wait for the UpdateState method to be called on the ClientConn. - timer = time.NewTimer(1 * time.Second) - select { - case <-timer.C: - t.Fatal("Timeout when waiting for ReportError to be called on the ClientConn") - case <-tcc.stateCh: - timer.Stop() + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) } } diff --git a/xds/internal/testutils/channel.go b/xds/internal/testutils/channel.go new file mode 100644 index 00000000..867d5ca3 --- /dev/null +++ b/xds/internal/testutils/channel.go @@ -0,0 +1,70 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package testutils provides multiple utility types, for use in xds tests. +package testutils + +import ( + "errors" + "time" +) + +// ErrRecvTimeout is an error to indicate that a receive operation on the +// channel timed out. +var ErrRecvTimeout = errors.New("timed out when waiting for value on channel") + +const ( + // DefaultChanRecvTimeout is the default timeout for receive operations on the + // underlying channel. + DefaultChanRecvTimeout = 1 * time.Second + // DefaultChanBufferSize is the default buffer size of the underlying channel. + DefaultChanBufferSize = 1 +) + +// Channel wraps a generic channel and provides a timed receive operation. +type Channel struct { + ch chan interface{} +} + +// Send sends value on the underlying channel. +func (cwt *Channel) Send(value interface{}) { + cwt.ch <- value +} + +// TimedReceive returns the value received on the underlying channel, or +// ErrRecvTimeout if timeout amount of time elapsed. +func (cwt *Channel) TimedReceive(timeout time.Duration) (interface{}, error) { + timer := time.NewTimer(timeout) + select { + case <-timer.C: + return nil, ErrRecvTimeout + case got := <-cwt.ch: + timer.Stop() + return got, nil + } +} + +// Receive returns the value received on the underlying channel, or +// ErrRecvTimeout if DefaultChanRecvTimeout amount of time elapses. +func (cwt *Channel) Receive() (interface{}, error) { + return cwt.TimedReceive(DefaultChanRecvTimeout) +} + +// NewChannel returns a new Channel. +func NewChannel() *Channel { + return &Channel{ch: make(chan interface{}, DefaultChanBufferSize)} +} diff --git a/xds/internal/testutils/xdsclient.go b/xds/internal/testutils/xdsclient.go new file mode 100644 index 00000000..566ba8a0 --- /dev/null +++ b/xds/internal/testutils/xdsclient.go @@ -0,0 +1,67 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package testutils + +import ( + xdsclient "google.golang.org/grpc/xds/internal/client" +) + +// XDSClient is a fake implementation of an xds client. It exposes a bunch of +// channels to signal the occurrence of various events. +type XDSClient struct { + serviceCb func(xdsclient.ServiceUpdate, error) + suWatchCh *Channel + closeCh *Channel + suCancelCh *Channel +} + +// WatchService registers a LDS/RDS watch. +func (xdsC *XDSClient) WatchService(target string, callback func(xdsclient.ServiceUpdate, error)) func() { + xdsC.serviceCb = callback + xdsC.suWatchCh.Send(target) + return func() { + xdsC.suCancelCh.Send(nil) + } +} + +// WaitForWatchService waits for WatchService to be invoked on this client +// within a reasonable timeout. +func (xdsC *XDSClient) WaitForWatchService() (string, error) { + val, err := xdsC.suWatchCh.Receive() + return val.(string), err +} + +// InvokeWatchServiceCb invokes the registered service watch callback. +func (xdsC *XDSClient) InvokeWatchServiceCb(cluster string, err error) { + xdsC.serviceCb(xdsclient.ServiceUpdate{Cluster: cluster}, err) +} + +// Close closes the xds client. +func (xdsC *XDSClient) Close() { + xdsC.closeCh.Send(nil) +} + +// NewXDSClient returns a new fake xds client. +func NewXDSClient() *XDSClient { + return &XDSClient{ + suWatchCh: NewChannel(), + closeCh: NewChannel(), + suCancelCh: NewChannel(), + } +}