From 24b653e8ccbbe44217a52463a5dc564a70c06489 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 20 Nov 2019 09:02:53 -0800 Subject: [PATCH] xds: Don't override dialFunc in xdsClient in tests. (#3191) * Add a helper to the fakexds package to return a ClientConn talking to the fake server. * Tests will make use of this ClientConn wherever required, or they will directly pass the fake server's address as the balancerName to the xdsclient.New() function, thus exercising that code path as well. * Add grpc.WithTimeout to list in vet.sh --- vet.sh | 1 + xds/internal/client/client.go | 8 +- xds/internal/client/client_test.go | 105 +++++++------------------ xds/internal/client/eds_test.go | 25 ++++-- xds/internal/client/fakexds/fakexds.go | 42 +++++----- xds/internal/client/lds_test.go | 24 ++++-- xds/internal/client/rds_test.go | 42 +++++++--- xds/internal/client/v2client_test.go | 26 ++++-- 8 files changed, 134 insertions(+), 139 deletions(-) diff --git a/vet.sh b/vet.sh index 88825e9d..b798aa25 100755 --- a/vet.sh +++ b/vet.sh @@ -147,6 +147,7 @@ grpc.WithDecompressor grpc.WithDialer grpc.WithMaxMsgSize grpc.WithServiceConfig +grpc.WithTimeout http.CloseNotifier naming.Resolver naming.Update diff --git a/xds/internal/client/client.go b/xds/internal/client/client.go index 4be85da8..b94ce751 100644 --- a/xds/internal/client/client.go +++ b/xds/internal/client/client.go @@ -21,7 +21,6 @@ package client import ( - "context" "errors" "fmt" "sync" @@ -32,9 +31,6 @@ import ( "google.golang.org/grpc/xds/internal/client/bootstrap" ) -// For overriding in unittests. -var dialFunc = grpc.DialContext - // Options provides all parameters required for the creation of an xDS client. type Options struct { // Config contains a fully populated bootstrap config. It is the @@ -71,9 +67,7 @@ func New(opts Options) (*Client, error) { } dopts := append([]grpc.DialOption{opts.Config.Creds}, opts.DialOpts...) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cc, err := dialFunc(ctx, opts.Config.BalancerName, dopts...) + cc, err := grpc.Dial(opts.Config.BalancerName, dopts...) if err != nil { // An error from a non-blocking dial indicates something serious. return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", opts.Config.BalancerName, err) diff --git a/xds/internal/client/client_test.go b/xds/internal/client/client_test.go index 03bc887d..df5fbfaf 100644 --- a/xds/internal/client/client_test.go +++ b/xds/internal/client/client_test.go @@ -19,7 +19,6 @@ package client import ( - "context" "errors" "fmt" "testing" @@ -32,20 +31,27 @@ import ( corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" ) -const balancerName = "dummyBalancer" - -var validConfig = bootstrap.Config{ - BalancerName: balancerName, - Creds: grpc.WithInsecure(), - NodeProto: &corepb.Node{}, +func clientOpts(balancerName string) Options { + return Options{ + Config: bootstrap.Config{ + BalancerName: balancerName, + Creds: grpc.WithInsecure(), + NodeProto: &corepb.Node{}, + }, + // WithTimeout is deprecated. But we are OK to call it here from the + // test, so we clearly know that the dial failed. + DialOpts: []grpc.DialOption{grpc.WithTimeout(5 * time.Second), grpc.WithBlock()}, + } } func TestNew(t *testing.T) { + fakeServer, cleanup := fakexds.StartServer(t) + defer cleanup() + tests := []struct { - name string - opts Options - wantErr bool - wantDialOptsLen int + name string + opts Options + wantErr bool }{ {name: "empty-opts", opts: Options{}, wantErr: true}, { @@ -79,37 +85,14 @@ func TestNew(t *testing.T) { wantErr: true, }, { - name: "without-extra-dialoptions", - opts: Options{Config: validConfig}, - wantErr: false, - wantDialOptsLen: 1, - }, - { - name: "without-extra-dialoptions", - opts: Options{ - Config: validConfig, - DialOpts: []grpc.DialOption{grpc.WithDisableRetry()}, - }, - wantErr: false, - wantDialOptsLen: 2, + name: "happy-case", + opts: clientOpts(fakeServer.Address), + wantErr: false, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - oldDialFunc := dialFunc - dialFunc = func(ctx context.Context, target string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { - if target != balancerName { - t.Fatalf("got target: %v, want %v", target, balancerName) - } - if len(dopts) != test.wantDialOptsLen { - t.Fatalf("got %d dialOptions, want %d", len(dopts), test.wantDialOptsLen) - } - return grpc.DialContext(ctx, target, dopts...) - } - defer func() { - dialFunc = oldDialFunc - }() if _, err := New(test.opts); (err != nil) != test.wantErr { t.Fatalf("New(%+v) = %v, wantErr: %v", test.opts, err, test.wantErr) } @@ -120,18 +103,10 @@ func TestNew(t *testing.T) { // TestWatchService tests the happy case of registering a watcher for // service updates and receiving a good update. func TestWatchService(t *testing.T) { - fakeServer, fakeCC, cleanup := fakexds.StartClientAndServer(t) + fakeServer, cleanup := fakexds.StartServer(t) defer cleanup() - oldDialFunc := dialFunc - dialFunc = func(_ context.Context, _ string, _ ...grpc.DialOption) (*grpc.ClientConn, error) { - return fakeCC, nil - } - defer func() { - dialFunc = oldDialFunc - }() - - xdsClient, err := New(Options{Config: validConfig}) + xdsClient, err := New(clientOpts(fakeServer.Address)) if err != nil { t.Fatalf("New returned error: %v", err) } @@ -176,18 +151,10 @@ func TestWatchService(t *testing.T) { // registering a service update watcher. The underlying v2Client will timeout // and will send us an error. func TestWatchServiceWithNoResponseFromServer(t *testing.T) { - fakeServer, fakeCC, cleanup := fakexds.StartClientAndServer(t) + fakeServer, cleanup := fakexds.StartServer(t) defer cleanup() - oldDialFunc := dialFunc - dialFunc = func(_ context.Context, _ string, _ ...grpc.DialOption) (*grpc.ClientConn, error) { - return fakeCC, nil - } - defer func() { - dialFunc = oldDialFunc - }() - - xdsClient, err := New(Options{Config: validConfig}) + xdsClient, err := New(clientOpts(fakeServer.Address)) if err != nil { t.Fatalf("New returned error: %v", err) } @@ -233,18 +200,10 @@ func TestWatchServiceWithNoResponseFromServer(t *testing.T) { // TestWatchServiceEmptyRDS tests the case where the underlying // v2Client receives an empty RDS response. func TestWatchServiceEmptyRDS(t *testing.T) { - fakeServer, fakeCC, cleanup := fakexds.StartClientAndServer(t) + fakeServer, cleanup := fakexds.StartServer(t) defer cleanup() - oldDialFunc := dialFunc - dialFunc = func(_ context.Context, _ string, _ ...grpc.DialOption) (*grpc.ClientConn, error) { - return fakeCC, nil - } - defer func() { - dialFunc = oldDialFunc - }() - - xdsClient, err := New(Options{Config: validConfig}) + xdsClient, err := New(clientOpts(fakeServer.Address)) if err != nil { t.Fatalf("New returned error: %v", err) } @@ -294,18 +253,10 @@ func TestWatchServiceEmptyRDS(t *testing.T) { // received after the client is closed, and we make sure that the registered // watcher callback is not invoked. func TestWatchServiceWithClientClose(t *testing.T) { - fakeServer, fakeCC, cleanup := fakexds.StartClientAndServer(t) + fakeServer, cleanup := fakexds.StartServer(t) defer cleanup() - oldDialFunc := dialFunc - dialFunc = func(_ context.Context, _ string, _ ...grpc.DialOption) (*grpc.ClientConn, error) { - return fakeCC, nil - } - defer func() { - dialFunc = oldDialFunc - }() - - xdsClient, err := New(Options{Config: validConfig}) + xdsClient, err := New(clientOpts(fakeServer.Address)) if err != nil { t.Fatalf("New returned error: %v", err) } diff --git a/xds/internal/client/eds_test.go b/xds/internal/client/eds_test.go index 26b50951..2b46f445 100644 --- a/xds/internal/client/eds_test.go +++ b/xds/internal/client/eds_test.go @@ -162,8 +162,12 @@ var ( ) func TestHandleEDSResponse(t *testing.T) { - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) tests := []struct { @@ -271,8 +275,12 @@ func TestHandleEDSResponse(t *testing.T) { // TestHandleEDSResponseWithoutEDSWatch tests the case where the v2Client // receives an EDS response without a registered EDS watcher. func TestHandleEDSResponseWithoutEDSWatch(t *testing.T) { - _, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) if v2c.handleEDSResponse(goodEDSResponse1) == nil { @@ -287,9 +295,12 @@ func TestEDSWatchExpiryTimer(t *testing.T) { defaultWatchExpiryTimeout = oldWatchExpiryTimeout }() - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() - + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...") diff --git a/xds/internal/client/fakexds/fakexds.go b/xds/internal/client/fakexds/fakexds.go index 0b9386bf..814100fa 100644 --- a/xds/internal/client/fakexds/fakexds.go +++ b/xds/internal/client/fakexds/fakexds.go @@ -60,19 +60,16 @@ type Server struct { // ResponseChan is a buffered channel from which the fake server reads the // responses that it must send out to the client. ResponseChan chan *Response + // Address is the host:port on which the fake xdsServer is listening on. + Address string } -// StartClientAndServer starts a fakexds.Server and creates a ClientConn -// talking to it. The returned cleanup function should be invoked by the caller -// once the test is done. -// TODO: Split this into two funcs, one to return a server and one to return a -// ClientConn connected to this server, and change tests accordingly. -func StartClientAndServer(t *testing.T) (*Server, *grpc.ClientConn, func()) { +// StartServer starts a fakexds.Server. The returned function should be invoked +// by the caller once the test is done. +func StartServer(t *testing.T) (*Server, func()) { t.Helper() - var lis net.Listener - var err error - lis, err = net.Listen("tcp", "localhost:0") + lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("net.Listen() failed: %v", err) } @@ -81,30 +78,31 @@ func StartClientAndServer(t *testing.T) (*Server, *grpc.ClientConn, func()) { fs := &Server{ RequestChan: make(chan *Request, defaultChannelBufferSize), ResponseChan: make(chan *Response, defaultChannelBufferSize), + Address: lis.Addr().String(), } adsgrpc.RegisterAggregatedDiscoveryServiceServer(server, fs) go server.Serve(lis) - t.Logf("Starting fake xDS server at %v...", lis.Addr().String()) - defer func() { - if err != nil { - server.Stop() - lis.Close() - } - }() + t.Logf("Starting fake xDS server at %v...", fs.Address) + + return fs, func() { server.Stop() } +} + +// GetClientConn returns a grpc.ClientConn talking to the fake server. The +// returned function should be invoked by the caller once the test is done. +func (fs *Server) GetClientConn(t *testing.T) (*grpc.ClientConn, func()) { + t.Helper() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - var cc *grpc.ClientConn - cc, err = grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock()) + cc, err := grpc.DialContext(ctx, fs.Address, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { - t.Fatalf("grpc.DialContext(%s) failed: %v", lis.Addr().String(), err) + t.Fatalf("grpc.DialContext(%s) failed: %v", fs.Address, err) } t.Log("Started xDS gRPC client...") - return fs, cc, func() { - server.Stop() - lis.Close() + return cc, func() { + cc.Close() } } diff --git a/xds/internal/client/lds_test.go b/xds/internal/client/lds_test.go index 535e1665..e76b6f79 100644 --- a/xds/internal/client/lds_test.go +++ b/xds/internal/client/lds_test.go @@ -91,8 +91,12 @@ func TestGetRouteConfigNameFromListener(t *testing.T) { // and creates a v2Client using it. Then, it registers a watchLDS and tests // different LDS responses. func TestHandleLDSResponse(t *testing.T) { - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) tests := []struct { @@ -223,8 +227,12 @@ func TestHandleLDSResponse(t *testing.T) { // TestHandleLDSResponseWithoutWatch tests the case where the v2Client receives // an LDS response without a registered watcher. func TestHandleLDSResponseWithoutWatch(t *testing.T) { - _, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) if v2c.handleLDSResponse(goodLDSResponse1) == nil { @@ -242,8 +250,12 @@ func TestLDSWatchExpiryTimer(t *testing.T) { defaultWatchExpiryTimeout = oldWatchExpiryTimeout }() - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) // Wait till the request makes it to the fakeServer. This ensures that diff --git a/xds/internal/client/rds_test.go b/xds/internal/client/rds_test.go index 0989bc21..79673d2f 100644 --- a/xds/internal/client/rds_test.go +++ b/xds/internal/client/rds_test.go @@ -144,8 +144,12 @@ func TestGetClusterFromRouteConfiguration(t *testing.T) { // and creates a v2Client using it. Then, it registers an LDS and RDS watcher // and tests different RDS responses. func TestHandleRDSResponse(t *testing.T) { - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) // Register an LDS watcher, and wait till the request is sent out, the @@ -262,8 +266,12 @@ func TestHandleRDSResponse(t *testing.T) { // TestHandleRDSResponseWithoutLDSWatch tests the case where the v2Client // receives an RDS response without a registered LDS watcher. func TestHandleRDSResponseWithoutLDSWatch(t *testing.T) { - _, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) if v2c.handleRDSResponse(goodRDSResponse1) == nil { @@ -274,8 +282,12 @@ func TestHandleRDSResponseWithoutLDSWatch(t *testing.T) { // TestHandleRDSResponseWithoutRDSWatch tests the case where the v2Client // receives an RDS response without a registered RDS watcher. func TestHandleRDSResponseWithoutRDSWatch(t *testing.T) { - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) // Register an LDS watcher, and wait till the request is sent out, the @@ -319,9 +331,12 @@ type testOp struct { func testRDSCaching(t *testing.T, testOps []testOp, errCh chan error) { t.Helper() - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() - + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...") @@ -450,9 +465,12 @@ func TestRDSWatchExpiryTimer(t *testing.T) { defaultWatchExpiryTimeout = oldWatchExpiryTimeout }() - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() - + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...") diff --git a/xds/internal/client/v2client_test.go b/xds/internal/client/v2client_test.go index bd022dc9..867a317d 100644 --- a/xds/internal/client/v2client_test.go +++ b/xds/internal/client/v2client_test.go @@ -371,8 +371,12 @@ var ( // TestV2ClientBackoffAfterRecvError verifies if the v2Client backoffs when it // encounters a Recv error while receiving an LDS response. func TestV2ClientBackoffAfterRecvError(t *testing.T) { - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() // Override the v2Client backoff function with this, so that we can verify // that a backoff actually was triggerred. @@ -412,9 +416,12 @@ func TestV2ClientBackoffAfterRecvError(t *testing.T) { // encountered a Recv() error, and is expected to send out xDS requests for // registered watchers once it comes back up again. func TestV2ClientRetriesAfterBrokenStream(t *testing.T) { - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() - + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...") @@ -457,9 +464,12 @@ func TestV2ClientRetriesAfterBrokenStream(t *testing.T) { // TestV2ClientCancelWatch verifies that the registered watch callback is not // invoked if a response is received after the watcher is cancelled. func TestV2ClientCancelWatch(t *testing.T) { - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() - + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...")