diff --git a/vet.sh b/vet.sh index 88825e9d..b798aa25 100755 --- a/vet.sh +++ b/vet.sh @@ -147,6 +147,7 @@ grpc.WithDecompressor grpc.WithDialer grpc.WithMaxMsgSize grpc.WithServiceConfig +grpc.WithTimeout http.CloseNotifier naming.Resolver naming.Update diff --git a/xds/internal/client/client.go b/xds/internal/client/client.go index 4be85da8..b94ce751 100644 --- a/xds/internal/client/client.go +++ b/xds/internal/client/client.go @@ -21,7 +21,6 @@ package client import ( - "context" "errors" "fmt" "sync" @@ -32,9 +31,6 @@ import ( "google.golang.org/grpc/xds/internal/client/bootstrap" ) -// For overriding in unittests. -var dialFunc = grpc.DialContext - // Options provides all parameters required for the creation of an xDS client. type Options struct { // Config contains a fully populated bootstrap config. It is the @@ -71,9 +67,7 @@ func New(opts Options) (*Client, error) { } dopts := append([]grpc.DialOption{opts.Config.Creds}, opts.DialOpts...) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cc, err := dialFunc(ctx, opts.Config.BalancerName, dopts...) + cc, err := grpc.Dial(opts.Config.BalancerName, dopts...) if err != nil { // An error from a non-blocking dial indicates something serious. return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", opts.Config.BalancerName, err) diff --git a/xds/internal/client/client_test.go b/xds/internal/client/client_test.go index 03bc887d..df5fbfaf 100644 --- a/xds/internal/client/client_test.go +++ b/xds/internal/client/client_test.go @@ -19,7 +19,6 @@ package client import ( - "context" "errors" "fmt" "testing" @@ -32,20 +31,27 @@ import ( corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" ) -const balancerName = "dummyBalancer" - -var validConfig = bootstrap.Config{ - BalancerName: balancerName, - Creds: grpc.WithInsecure(), - NodeProto: &corepb.Node{}, +func clientOpts(balancerName string) Options { + return Options{ + Config: bootstrap.Config{ + BalancerName: balancerName, + Creds: grpc.WithInsecure(), + NodeProto: &corepb.Node{}, + }, + // WithTimeout is deprecated. But we are OK to call it here from the + // test, so we clearly know that the dial failed. + DialOpts: []grpc.DialOption{grpc.WithTimeout(5 * time.Second), grpc.WithBlock()}, + } } func TestNew(t *testing.T) { + fakeServer, cleanup := fakexds.StartServer(t) + defer cleanup() + tests := []struct { - name string - opts Options - wantErr bool - wantDialOptsLen int + name string + opts Options + wantErr bool }{ {name: "empty-opts", opts: Options{}, wantErr: true}, { @@ -79,37 +85,14 @@ func TestNew(t *testing.T) { wantErr: true, }, { - name: "without-extra-dialoptions", - opts: Options{Config: validConfig}, - wantErr: false, - wantDialOptsLen: 1, - }, - { - name: "without-extra-dialoptions", - opts: Options{ - Config: validConfig, - DialOpts: []grpc.DialOption{grpc.WithDisableRetry()}, - }, - wantErr: false, - wantDialOptsLen: 2, + name: "happy-case", + opts: clientOpts(fakeServer.Address), + wantErr: false, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - oldDialFunc := dialFunc - dialFunc = func(ctx context.Context, target string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { - if target != balancerName { - t.Fatalf("got target: %v, want %v", target, balancerName) - } - if len(dopts) != test.wantDialOptsLen { - t.Fatalf("got %d dialOptions, want %d", len(dopts), test.wantDialOptsLen) - } - return grpc.DialContext(ctx, target, dopts...) - } - defer func() { - dialFunc = oldDialFunc - }() if _, err := New(test.opts); (err != nil) != test.wantErr { t.Fatalf("New(%+v) = %v, wantErr: %v", test.opts, err, test.wantErr) } @@ -120,18 +103,10 @@ func TestNew(t *testing.T) { // TestWatchService tests the happy case of registering a watcher for // service updates and receiving a good update. func TestWatchService(t *testing.T) { - fakeServer, fakeCC, cleanup := fakexds.StartClientAndServer(t) + fakeServer, cleanup := fakexds.StartServer(t) defer cleanup() - oldDialFunc := dialFunc - dialFunc = func(_ context.Context, _ string, _ ...grpc.DialOption) (*grpc.ClientConn, error) { - return fakeCC, nil - } - defer func() { - dialFunc = oldDialFunc - }() - - xdsClient, err := New(Options{Config: validConfig}) + xdsClient, err := New(clientOpts(fakeServer.Address)) if err != nil { t.Fatalf("New returned error: %v", err) } @@ -176,18 +151,10 @@ func TestWatchService(t *testing.T) { // registering a service update watcher. The underlying v2Client will timeout // and will send us an error. func TestWatchServiceWithNoResponseFromServer(t *testing.T) { - fakeServer, fakeCC, cleanup := fakexds.StartClientAndServer(t) + fakeServer, cleanup := fakexds.StartServer(t) defer cleanup() - oldDialFunc := dialFunc - dialFunc = func(_ context.Context, _ string, _ ...grpc.DialOption) (*grpc.ClientConn, error) { - return fakeCC, nil - } - defer func() { - dialFunc = oldDialFunc - }() - - xdsClient, err := New(Options{Config: validConfig}) + xdsClient, err := New(clientOpts(fakeServer.Address)) if err != nil { t.Fatalf("New returned error: %v", err) } @@ -233,18 +200,10 @@ func TestWatchServiceWithNoResponseFromServer(t *testing.T) { // TestWatchServiceEmptyRDS tests the case where the underlying // v2Client receives an empty RDS response. func TestWatchServiceEmptyRDS(t *testing.T) { - fakeServer, fakeCC, cleanup := fakexds.StartClientAndServer(t) + fakeServer, cleanup := fakexds.StartServer(t) defer cleanup() - oldDialFunc := dialFunc - dialFunc = func(_ context.Context, _ string, _ ...grpc.DialOption) (*grpc.ClientConn, error) { - return fakeCC, nil - } - defer func() { - dialFunc = oldDialFunc - }() - - xdsClient, err := New(Options{Config: validConfig}) + xdsClient, err := New(clientOpts(fakeServer.Address)) if err != nil { t.Fatalf("New returned error: %v", err) } @@ -294,18 +253,10 @@ func TestWatchServiceEmptyRDS(t *testing.T) { // received after the client is closed, and we make sure that the registered // watcher callback is not invoked. func TestWatchServiceWithClientClose(t *testing.T) { - fakeServer, fakeCC, cleanup := fakexds.StartClientAndServer(t) + fakeServer, cleanup := fakexds.StartServer(t) defer cleanup() - oldDialFunc := dialFunc - dialFunc = func(_ context.Context, _ string, _ ...grpc.DialOption) (*grpc.ClientConn, error) { - return fakeCC, nil - } - defer func() { - dialFunc = oldDialFunc - }() - - xdsClient, err := New(Options{Config: validConfig}) + xdsClient, err := New(clientOpts(fakeServer.Address)) if err != nil { t.Fatalf("New returned error: %v", err) } diff --git a/xds/internal/client/eds_test.go b/xds/internal/client/eds_test.go index 26b50951..2b46f445 100644 --- a/xds/internal/client/eds_test.go +++ b/xds/internal/client/eds_test.go @@ -162,8 +162,12 @@ var ( ) func TestHandleEDSResponse(t *testing.T) { - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) tests := []struct { @@ -271,8 +275,12 @@ func TestHandleEDSResponse(t *testing.T) { // TestHandleEDSResponseWithoutEDSWatch tests the case where the v2Client // receives an EDS response without a registered EDS watcher. func TestHandleEDSResponseWithoutEDSWatch(t *testing.T) { - _, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) if v2c.handleEDSResponse(goodEDSResponse1) == nil { @@ -287,9 +295,12 @@ func TestEDSWatchExpiryTimer(t *testing.T) { defaultWatchExpiryTimeout = oldWatchExpiryTimeout }() - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() - + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...") diff --git a/xds/internal/client/fakexds/fakexds.go b/xds/internal/client/fakexds/fakexds.go index 0b9386bf..814100fa 100644 --- a/xds/internal/client/fakexds/fakexds.go +++ b/xds/internal/client/fakexds/fakexds.go @@ -60,19 +60,16 @@ type Server struct { // ResponseChan is a buffered channel from which the fake server reads the // responses that it must send out to the client. ResponseChan chan *Response + // Address is the host:port on which the fake xdsServer is listening on. + Address string } -// StartClientAndServer starts a fakexds.Server and creates a ClientConn -// talking to it. The returned cleanup function should be invoked by the caller -// once the test is done. -// TODO: Split this into two funcs, one to return a server and one to return a -// ClientConn connected to this server, and change tests accordingly. -func StartClientAndServer(t *testing.T) (*Server, *grpc.ClientConn, func()) { +// StartServer starts a fakexds.Server. The returned function should be invoked +// by the caller once the test is done. +func StartServer(t *testing.T) (*Server, func()) { t.Helper() - var lis net.Listener - var err error - lis, err = net.Listen("tcp", "localhost:0") + lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("net.Listen() failed: %v", err) } @@ -81,30 +78,31 @@ func StartClientAndServer(t *testing.T) (*Server, *grpc.ClientConn, func()) { fs := &Server{ RequestChan: make(chan *Request, defaultChannelBufferSize), ResponseChan: make(chan *Response, defaultChannelBufferSize), + Address: lis.Addr().String(), } adsgrpc.RegisterAggregatedDiscoveryServiceServer(server, fs) go server.Serve(lis) - t.Logf("Starting fake xDS server at %v...", lis.Addr().String()) - defer func() { - if err != nil { - server.Stop() - lis.Close() - } - }() + t.Logf("Starting fake xDS server at %v...", fs.Address) + + return fs, func() { server.Stop() } +} + +// GetClientConn returns a grpc.ClientConn talking to the fake server. The +// returned function should be invoked by the caller once the test is done. +func (fs *Server) GetClientConn(t *testing.T) (*grpc.ClientConn, func()) { + t.Helper() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - var cc *grpc.ClientConn - cc, err = grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock()) + cc, err := grpc.DialContext(ctx, fs.Address, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { - t.Fatalf("grpc.DialContext(%s) failed: %v", lis.Addr().String(), err) + t.Fatalf("grpc.DialContext(%s) failed: %v", fs.Address, err) } t.Log("Started xDS gRPC client...") - return fs, cc, func() { - server.Stop() - lis.Close() + return cc, func() { + cc.Close() } } diff --git a/xds/internal/client/lds_test.go b/xds/internal/client/lds_test.go index 535e1665..e76b6f79 100644 --- a/xds/internal/client/lds_test.go +++ b/xds/internal/client/lds_test.go @@ -91,8 +91,12 @@ func TestGetRouteConfigNameFromListener(t *testing.T) { // and creates a v2Client using it. Then, it registers a watchLDS and tests // different LDS responses. func TestHandleLDSResponse(t *testing.T) { - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) tests := []struct { @@ -223,8 +227,12 @@ func TestHandleLDSResponse(t *testing.T) { // TestHandleLDSResponseWithoutWatch tests the case where the v2Client receives // an LDS response without a registered watcher. func TestHandleLDSResponseWithoutWatch(t *testing.T) { - _, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) if v2c.handleLDSResponse(goodLDSResponse1) == nil { @@ -242,8 +250,12 @@ func TestLDSWatchExpiryTimer(t *testing.T) { defaultWatchExpiryTimeout = oldWatchExpiryTimeout }() - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) // Wait till the request makes it to the fakeServer. This ensures that diff --git a/xds/internal/client/rds_test.go b/xds/internal/client/rds_test.go index 0989bc21..79673d2f 100644 --- a/xds/internal/client/rds_test.go +++ b/xds/internal/client/rds_test.go @@ -144,8 +144,12 @@ func TestGetClusterFromRouteConfiguration(t *testing.T) { // and creates a v2Client using it. Then, it registers an LDS and RDS watcher // and tests different RDS responses. func TestHandleRDSResponse(t *testing.T) { - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) // Register an LDS watcher, and wait till the request is sent out, the @@ -262,8 +266,12 @@ func TestHandleRDSResponse(t *testing.T) { // TestHandleRDSResponseWithoutLDSWatch tests the case where the v2Client // receives an RDS response without a registered LDS watcher. func TestHandleRDSResponseWithoutLDSWatch(t *testing.T) { - _, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) if v2c.handleRDSResponse(goodRDSResponse1) == nil { @@ -274,8 +282,12 @@ func TestHandleRDSResponseWithoutLDSWatch(t *testing.T) { // TestHandleRDSResponseWithoutRDSWatch tests the case where the v2Client // receives an RDS response without a registered RDS watcher. func TestHandleRDSResponseWithoutRDSWatch(t *testing.T) { - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) // Register an LDS watcher, and wait till the request is sent out, the @@ -319,9 +331,12 @@ type testOp struct { func testRDSCaching(t *testing.T, testOps []testOp, errCh chan error) { t.Helper() - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() - + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...") @@ -450,9 +465,12 @@ func TestRDSWatchExpiryTimer(t *testing.T) { defaultWatchExpiryTimeout = oldWatchExpiryTimeout }() - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() - + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...") diff --git a/xds/internal/client/v2client_test.go b/xds/internal/client/v2client_test.go index bd022dc9..867a317d 100644 --- a/xds/internal/client/v2client_test.go +++ b/xds/internal/client/v2client_test.go @@ -371,8 +371,12 @@ var ( // TestV2ClientBackoffAfterRecvError verifies if the v2Client backoffs when it // encounters a Recv error while receiving an LDS response. func TestV2ClientBackoffAfterRecvError(t *testing.T) { - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() // Override the v2Client backoff function with this, so that we can verify // that a backoff actually was triggerred. @@ -412,9 +416,12 @@ func TestV2ClientBackoffAfterRecvError(t *testing.T) { // encountered a Recv() error, and is expected to send out xDS requests for // registered watchers once it comes back up again. func TestV2ClientRetriesAfterBrokenStream(t *testing.T) { - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() - + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...") @@ -457,9 +464,12 @@ func TestV2ClientRetriesAfterBrokenStream(t *testing.T) { // TestV2ClientCancelWatch verifies that the registered watch callback is not // invoked if a response is received after the watcher is cancelled. func TestV2ClientCancelWatch(t *testing.T) { - fakeServer, client, cleanup := fakexds.StartClientAndServer(t) - defer cleanup() - + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) defer v2c.close() t.Log("Started xds v2Client...")