xds: Don't override dialFunc in xdsClient in tests. (#3191)

* Add a helper to the fakexds package to return a ClientConn talking to
  the fake server.
* Tests will make use of this ClientConn wherever required, or they will
  directly pass the fake server's address as the balancerName to the
  xdsclient.New() function, thus exercising that code path as well.
* Add grpc.WithTimeout to list in vet.sh
This commit is contained in:
Easwar Swaminathan
2019-11-20 09:02:53 -08:00
committed by GitHub
parent d720ab346f
commit 24b653e8cc
8 changed files with 134 additions and 139 deletions

1
vet.sh
View File

@ -147,6 +147,7 @@ grpc.WithDecompressor
grpc.WithDialer
grpc.WithMaxMsgSize
grpc.WithServiceConfig
grpc.WithTimeout
http.CloseNotifier
naming.Resolver
naming.Update

View File

@ -21,7 +21,6 @@
package client
import (
"context"
"errors"
"fmt"
"sync"
@ -32,9 +31,6 @@ import (
"google.golang.org/grpc/xds/internal/client/bootstrap"
)
// For overriding in unittests.
var dialFunc = grpc.DialContext
// Options provides all parameters required for the creation of an xDS client.
type Options struct {
// Config contains a fully populated bootstrap config. It is the
@ -71,9 +67,7 @@ func New(opts Options) (*Client, error) {
}
dopts := append([]grpc.DialOption{opts.Config.Creds}, opts.DialOpts...)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cc, err := dialFunc(ctx, opts.Config.BalancerName, dopts...)
cc, err := grpc.Dial(opts.Config.BalancerName, dopts...)
if err != nil {
// An error from a non-blocking dial indicates something serious.
return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", opts.Config.BalancerName, err)

View File

@ -19,7 +19,6 @@
package client
import (
"context"
"errors"
"fmt"
"testing"
@ -32,20 +31,27 @@ import (
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
)
const balancerName = "dummyBalancer"
var validConfig = bootstrap.Config{
func clientOpts(balancerName string) Options {
return Options{
Config: bootstrap.Config{
BalancerName: balancerName,
Creds: grpc.WithInsecure(),
NodeProto: &corepb.Node{},
},
// WithTimeout is deprecated. But we are OK to call it here from the
// test, so we clearly know that the dial failed.
DialOpts: []grpc.DialOption{grpc.WithTimeout(5 * time.Second), grpc.WithBlock()},
}
}
func TestNew(t *testing.T) {
fakeServer, cleanup := fakexds.StartServer(t)
defer cleanup()
tests := []struct {
name string
opts Options
wantErr bool
wantDialOptsLen int
}{
{name: "empty-opts", opts: Options{}, wantErr: true},
{
@ -79,37 +85,14 @@ func TestNew(t *testing.T) {
wantErr: true,
},
{
name: "without-extra-dialoptions",
opts: Options{Config: validConfig},
name: "happy-case",
opts: clientOpts(fakeServer.Address),
wantErr: false,
wantDialOptsLen: 1,
},
{
name: "without-extra-dialoptions",
opts: Options{
Config: validConfig,
DialOpts: []grpc.DialOption{grpc.WithDisableRetry()},
},
wantErr: false,
wantDialOptsLen: 2,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
oldDialFunc := dialFunc
dialFunc = func(ctx context.Context, target string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
if target != balancerName {
t.Fatalf("got target: %v, want %v", target, balancerName)
}
if len(dopts) != test.wantDialOptsLen {
t.Fatalf("got %d dialOptions, want %d", len(dopts), test.wantDialOptsLen)
}
return grpc.DialContext(ctx, target, dopts...)
}
defer func() {
dialFunc = oldDialFunc
}()
if _, err := New(test.opts); (err != nil) != test.wantErr {
t.Fatalf("New(%+v) = %v, wantErr: %v", test.opts, err, test.wantErr)
}
@ -120,18 +103,10 @@ func TestNew(t *testing.T) {
// TestWatchService tests the happy case of registering a watcher for
// service updates and receiving a good update.
func TestWatchService(t *testing.T) {
fakeServer, fakeCC, cleanup := fakexds.StartClientAndServer(t)
fakeServer, cleanup := fakexds.StartServer(t)
defer cleanup()
oldDialFunc := dialFunc
dialFunc = func(_ context.Context, _ string, _ ...grpc.DialOption) (*grpc.ClientConn, error) {
return fakeCC, nil
}
defer func() {
dialFunc = oldDialFunc
}()
xdsClient, err := New(Options{Config: validConfig})
xdsClient, err := New(clientOpts(fakeServer.Address))
if err != nil {
t.Fatalf("New returned error: %v", err)
}
@ -176,18 +151,10 @@ func TestWatchService(t *testing.T) {
// registering a service update watcher. The underlying v2Client will timeout
// and will send us an error.
func TestWatchServiceWithNoResponseFromServer(t *testing.T) {
fakeServer, fakeCC, cleanup := fakexds.StartClientAndServer(t)
fakeServer, cleanup := fakexds.StartServer(t)
defer cleanup()
oldDialFunc := dialFunc
dialFunc = func(_ context.Context, _ string, _ ...grpc.DialOption) (*grpc.ClientConn, error) {
return fakeCC, nil
}
defer func() {
dialFunc = oldDialFunc
}()
xdsClient, err := New(Options{Config: validConfig})
xdsClient, err := New(clientOpts(fakeServer.Address))
if err != nil {
t.Fatalf("New returned error: %v", err)
}
@ -233,18 +200,10 @@ func TestWatchServiceWithNoResponseFromServer(t *testing.T) {
// TestWatchServiceEmptyRDS tests the case where the underlying
// v2Client receives an empty RDS response.
func TestWatchServiceEmptyRDS(t *testing.T) {
fakeServer, fakeCC, cleanup := fakexds.StartClientAndServer(t)
fakeServer, cleanup := fakexds.StartServer(t)
defer cleanup()
oldDialFunc := dialFunc
dialFunc = func(_ context.Context, _ string, _ ...grpc.DialOption) (*grpc.ClientConn, error) {
return fakeCC, nil
}
defer func() {
dialFunc = oldDialFunc
}()
xdsClient, err := New(Options{Config: validConfig})
xdsClient, err := New(clientOpts(fakeServer.Address))
if err != nil {
t.Fatalf("New returned error: %v", err)
}
@ -294,18 +253,10 @@ func TestWatchServiceEmptyRDS(t *testing.T) {
// received after the client is closed, and we make sure that the registered
// watcher callback is not invoked.
func TestWatchServiceWithClientClose(t *testing.T) {
fakeServer, fakeCC, cleanup := fakexds.StartClientAndServer(t)
fakeServer, cleanup := fakexds.StartServer(t)
defer cleanup()
oldDialFunc := dialFunc
dialFunc = func(_ context.Context, _ string, _ ...grpc.DialOption) (*grpc.ClientConn, error) {
return fakeCC, nil
}
defer func() {
dialFunc = oldDialFunc
}()
xdsClient, err := New(Options{Config: validConfig})
xdsClient, err := New(clientOpts(fakeServer.Address))
if err != nil {
t.Fatalf("New returned error: %v", err)
}

View File

@ -162,8 +162,12 @@ var (
)
func TestHandleEDSResponse(t *testing.T) {
fakeServer, client, cleanup := fakexds.StartClientAndServer(t)
defer cleanup()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
tests := []struct {
@ -271,8 +275,12 @@ func TestHandleEDSResponse(t *testing.T) {
// TestHandleEDSResponseWithoutEDSWatch tests the case where the v2Client
// receives an EDS response without a registered EDS watcher.
func TestHandleEDSResponseWithoutEDSWatch(t *testing.T) {
_, client, cleanup := fakexds.StartClientAndServer(t)
defer cleanup()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
if v2c.handleEDSResponse(goodEDSResponse1) == nil {
@ -287,9 +295,12 @@ func TestEDSWatchExpiryTimer(t *testing.T) {
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
}()
fakeServer, client, cleanup := fakexds.StartClientAndServer(t)
defer cleanup()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")

View File

@ -60,19 +60,16 @@ type Server struct {
// ResponseChan is a buffered channel from which the fake server reads the
// responses that it must send out to the client.
ResponseChan chan *Response
// Address is the host:port on which the fake xdsServer is listening on.
Address string
}
// StartClientAndServer starts a fakexds.Server and creates a ClientConn
// talking to it. The returned cleanup function should be invoked by the caller
// once the test is done.
// TODO: Split this into two funcs, one to return a server and one to return a
// ClientConn connected to this server, and change tests accordingly.
func StartClientAndServer(t *testing.T) (*Server, *grpc.ClientConn, func()) {
// StartServer starts a fakexds.Server. The returned function should be invoked
// by the caller once the test is done.
func StartServer(t *testing.T) (*Server, func()) {
t.Helper()
var lis net.Listener
var err error
lis, err = net.Listen("tcp", "localhost:0")
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("net.Listen() failed: %v", err)
}
@ -81,30 +78,31 @@ func StartClientAndServer(t *testing.T) (*Server, *grpc.ClientConn, func()) {
fs := &Server{
RequestChan: make(chan *Request, defaultChannelBufferSize),
ResponseChan: make(chan *Response, defaultChannelBufferSize),
Address: lis.Addr().String(),
}
adsgrpc.RegisterAggregatedDiscoveryServiceServer(server, fs)
go server.Serve(lis)
t.Logf("Starting fake xDS server at %v...", lis.Addr().String())
defer func() {
if err != nil {
server.Stop()
lis.Close()
t.Logf("Starting fake xDS server at %v...", fs.Address)
return fs, func() { server.Stop() }
}
}()
// GetClientConn returns a grpc.ClientConn talking to the fake server. The
// returned function should be invoked by the caller once the test is done.
func (fs *Server) GetClientConn(t *testing.T) (*grpc.ClientConn, func()) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var cc *grpc.ClientConn
cc, err = grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock())
cc, err := grpc.DialContext(ctx, fs.Address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
t.Fatalf("grpc.DialContext(%s) failed: %v", lis.Addr().String(), err)
t.Fatalf("grpc.DialContext(%s) failed: %v", fs.Address, err)
}
t.Log("Started xDS gRPC client...")
return fs, cc, func() {
server.Stop()
lis.Close()
return cc, func() {
cc.Close()
}
}

View File

@ -91,8 +91,12 @@ func TestGetRouteConfigNameFromListener(t *testing.T) {
// and creates a v2Client using it. Then, it registers a watchLDS and tests
// different LDS responses.
func TestHandleLDSResponse(t *testing.T) {
fakeServer, client, cleanup := fakexds.StartClientAndServer(t)
defer cleanup()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
tests := []struct {
@ -223,8 +227,12 @@ func TestHandleLDSResponse(t *testing.T) {
// TestHandleLDSResponseWithoutWatch tests the case where the v2Client receives
// an LDS response without a registered watcher.
func TestHandleLDSResponseWithoutWatch(t *testing.T) {
_, client, cleanup := fakexds.StartClientAndServer(t)
defer cleanup()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
if v2c.handleLDSResponse(goodLDSResponse1) == nil {
@ -242,8 +250,12 @@ func TestLDSWatchExpiryTimer(t *testing.T) {
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
}()
fakeServer, client, cleanup := fakexds.StartClientAndServer(t)
defer cleanup()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
// Wait till the request makes it to the fakeServer. This ensures that

View File

@ -144,8 +144,12 @@ func TestGetClusterFromRouteConfiguration(t *testing.T) {
// and creates a v2Client using it. Then, it registers an LDS and RDS watcher
// and tests different RDS responses.
func TestHandleRDSResponse(t *testing.T) {
fakeServer, client, cleanup := fakexds.StartClientAndServer(t)
defer cleanup()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
// Register an LDS watcher, and wait till the request is sent out, the
@ -262,8 +266,12 @@ func TestHandleRDSResponse(t *testing.T) {
// TestHandleRDSResponseWithoutLDSWatch tests the case where the v2Client
// receives an RDS response without a registered LDS watcher.
func TestHandleRDSResponseWithoutLDSWatch(t *testing.T) {
_, client, cleanup := fakexds.StartClientAndServer(t)
defer cleanup()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
if v2c.handleRDSResponse(goodRDSResponse1) == nil {
@ -274,8 +282,12 @@ func TestHandleRDSResponseWithoutLDSWatch(t *testing.T) {
// TestHandleRDSResponseWithoutRDSWatch tests the case where the v2Client
// receives an RDS response without a registered RDS watcher.
func TestHandleRDSResponseWithoutRDSWatch(t *testing.T) {
fakeServer, client, cleanup := fakexds.StartClientAndServer(t)
defer cleanup()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
// Register an LDS watcher, and wait till the request is sent out, the
@ -319,9 +331,12 @@ type testOp struct {
func testRDSCaching(t *testing.T, testOps []testOp, errCh chan error) {
t.Helper()
fakeServer, client, cleanup := fakexds.StartClientAndServer(t)
defer cleanup()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
@ -450,9 +465,12 @@ func TestRDSWatchExpiryTimer(t *testing.T) {
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
}()
fakeServer, client, cleanup := fakexds.StartClientAndServer(t)
defer cleanup()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")

View File

@ -371,8 +371,12 @@ var (
// TestV2ClientBackoffAfterRecvError verifies if the v2Client backoffs when it
// encounters a Recv error while receiving an LDS response.
func TestV2ClientBackoffAfterRecvError(t *testing.T) {
fakeServer, client, cleanup := fakexds.StartClientAndServer(t)
defer cleanup()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
// Override the v2Client backoff function with this, so that we can verify
// that a backoff actually was triggerred.
@ -412,9 +416,12 @@ func TestV2ClientBackoffAfterRecvError(t *testing.T) {
// encountered a Recv() error, and is expected to send out xDS requests for
// registered watchers once it comes back up again.
func TestV2ClientRetriesAfterBrokenStream(t *testing.T) {
fakeServer, client, cleanup := fakexds.StartClientAndServer(t)
defer cleanup()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
@ -457,9 +464,12 @@ func TestV2ClientRetriesAfterBrokenStream(t *testing.T) {
// TestV2ClientCancelWatch verifies that the registered watch callback is not
// invoked if a response is received after the watcher is cancelled.
func TestV2ClientCancelWatch(t *testing.T) {
fakeServer, client, cleanup := fakexds.StartClientAndServer(t)
defer cleanup()
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")