From d01454da4a85ea42597210d6de8dac1d8fbc52c6 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 17 Dec 2019 11:42:43 -0800 Subject: [PATCH] xds: Improve balancere/xds_test.go. (#3256) --- xds/internal/balancer/xds_test.go | 874 ++++++++++------------- xds/internal/testutils/channel.go | 7 +- xds/internal/testutils/fakexds/client.go | 34 +- 3 files changed, 415 insertions(+), 500 deletions(-) diff --git a/xds/internal/balancer/xds_test.go b/xds/internal/balancer/xds_test.go index efa8a9cd..2e07ec2f 100644 --- a/xds/internal/balancer/xds_test.go +++ b/xds/internal/balancer/xds_test.go @@ -21,12 +21,12 @@ package balancer import ( "bytes" "encoding/json" + "fmt" "reflect" "sync" "testing" "time" - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" "github.com/golang/protobuf/jsonpb" wrapperspb "github.com/golang/protobuf/ptypes/wrappers" @@ -42,7 +42,8 @@ import ( "google.golang.org/grpc/xds/internal/balancer/lrs" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/bootstrap" - "google.golang.org/grpc/xds/internal/client/fakexds" + "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/testutils/fakexds" ) var lbABuilder = &balancerABuilder{} @@ -87,10 +88,6 @@ var ( specialAddrForBalancerA = resolver.Address{Addr: "this.is.balancer.A"} specialAddrForBalancerB = resolver.Address{Addr: "this.is.balancer.B"} - - // mu protects the access of latestFakeEdsBalancer - mu sync.Mutex - latestFakeEdsBalancer *fakeEDSBalancer ) type balancerABuilder struct { @@ -100,7 +97,7 @@ type balancerABuilder struct { func (b *balancerABuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { b.mu.Lock() - b.lastBalancer = &balancerA{cc: cc, subconnStateChange: make(chan *scStateChange, 10)} + b.lastBalancer = &balancerA{cc: cc, subconnStateChange: testutils.NewChannelWithSize(10)} b.mu.Unlock() return b.lastBalancer } @@ -138,11 +135,11 @@ func (*balancerBBuilder) Name() string { // the test to inspect. type balancerA struct { cc balancer.ClientConn - subconnStateChange chan *scStateChange + subconnStateChange *testutils.Channel } func (b *balancerA) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { - b.subconnStateChange <- &scStateChange{sc: sc, state: state} + b.subconnStateChange.Send(&scStateChange{sc: sc, state: state}) } func (b *balancerA) HandleResolvedAddrs(addrs []resolver.Address, err error) { @@ -151,6 +148,10 @@ func (b *balancerA) HandleResolvedAddrs(addrs []resolver.Address, err error) { func (b *balancerA) Close() {} +func (b *balancerA) waitForSubConnStateChange(wantState *scStateChange) error { + return waitForSubConnStateChange(b.subconnStateChange, wantState) +} + // A fake balancer implementation which appends a unique address to the list of // resolved addresses received before attempting to create a SubConn. type balancerB struct { @@ -167,18 +168,30 @@ func (balancerB) HandleSubConnStateChange(sc balancer.SubConn, state connectivit func (balancerB) Close() {} func newTestClientConn() *testClientConn { - return &testClientConn{newSubConns: make(chan []resolver.Address, 10)} + return &testClientConn{newSubConns: testutils.NewChannelWithSize(10)} } type testClientConn struct { - newSubConns chan []resolver.Address + newSubConns *testutils.Channel } func (t *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { - t.newSubConns <- addrs + t.newSubConns.Send(addrs) return nil, nil } +func (t *testClientConn) waitForNewSubConns(wantAddrs []resolver.Address) error { + val, err := t.newSubConns.Receive() + if err != nil { + return fmt.Errorf("error waiting for subconns: %v", err) + } + gotAddrs := val.([]resolver.Address) + if !reflect.DeepEqual(gotAddrs, wantAddrs) { + return fmt.Errorf("got subconn address %v, want %v", gotAddrs, wantAddrs) + } + return nil +} + func (testClientConn) RemoveSubConn(balancer.SubConn) {} func (testClientConn) UpdateBalancerState(connectivity.State, balancer.Picker) {} func (testClientConn) UpdateState(balancer.State) {} @@ -192,53 +205,57 @@ type scStateChange struct { type fakeEDSBalancer struct { cc balancer.ClientConn - edsChan chan *xdsclient.EDSUpdate - childPolicy chan *loadBalancingConfig - fallbackPolicy chan *loadBalancingConfig - subconnStateChange chan *scStateChange + childPolicy *testutils.Channel + subconnStateChange *testutils.Channel loadStore lrs.Store } func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { - f.subconnStateChange <- &scStateChange{sc: sc, state: state} -} - -func (f *fakeEDSBalancer) Close() { - mu.Lock() - defer mu.Unlock() - latestFakeEdsBalancer = nil -} - -func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) { - f.edsChan <- edsResp + f.subconnStateChange.Send(&scStateChange{sc: sc, state: state}) } func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) { - f.childPolicy <- &loadBalancingConfig{ - Name: name, - Config: config, + f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config}) +} + +func (f *fakeEDSBalancer) Close() {} +func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {} + +func (f *fakeEDSBalancer) waitForChildPolicy(wantPolicy *loadBalancingConfig) error { + val, err := f.childPolicy.Receive() + if err != nil { + return fmt.Errorf("error waiting for childPolicy: %v", err) } + gotPolicy := val.(*loadBalancingConfig) + if !reflect.DeepEqual(gotPolicy, wantPolicy) { + return fmt.Errorf("got childPolicy %v, want %v", gotPolicy, wantPolicy) + } + return nil +} + +func (f *fakeEDSBalancer) waitForSubConnStateChange(wantState *scStateChange) error { + return waitForSubConnStateChange(f.subconnStateChange, wantState) +} + +func waitForSubConnStateChange(ch *testutils.Channel, wantState *scStateChange) error { + val, err := ch.Receive() + if err != nil { + return fmt.Errorf("error waiting for subconnStateChange: %v", err) + } + gotState := val.(*scStateChange) + if !reflect.DeepEqual(gotState, wantState) { + return fmt.Errorf("got subconnStateChange %v, want %v", gotState, wantState) + } + return nil } func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface { - lb := &fakeEDSBalancer{ + return &fakeEDSBalancer{ cc: cc, - edsChan: make(chan *xdsclient.EDSUpdate, 10), - childPolicy: make(chan *loadBalancingConfig, 10), - fallbackPolicy: make(chan *loadBalancingConfig, 10), - subconnStateChange: make(chan *scStateChange, 10), + childPolicy: testutils.NewChannelWithSize(10), + subconnStateChange: testutils.NewChannelWithSize(10), loadStore: loadStore, } - mu.Lock() - latestFakeEdsBalancer = lb - mu.Unlock() - return lb -} - -func getLatestEdsBalancer() *fakeEDSBalancer { - mu.Lock() - defer mu.Unlock() - return latestFakeEdsBalancer } type fakeSubConn struct{} @@ -246,41 +263,7 @@ type fakeSubConn struct{} func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") } func (*fakeSubConn) Connect() { panic("implement me") } -type fakeXDSClient struct { - edsCbReceived chan struct{} // Will be closed when WatchEDS is called. - edsCb func(*xdsclient.EDSUpdate, error) -} - -func newFakeXDSClient() *fakeXDSClient { - return &fakeXDSClient{edsCbReceived: make(chan struct{})} -} - -func (c *fakeXDSClient) WatchEDS(clusterName string, edsCb func(*xdsclient.EDSUpdate, error)) (cancel func()) { - c.edsCb = edsCb - // WatchEDS is expected to be only called once in the test. If a test needs - // to call it multiple times, this will panic. - close(c.edsCbReceived) - return func() {} -} - -func (c *fakeXDSClient) callEDSCallback(u *xdsclient.EDSUpdate, err error) { - t := time.NewTimer(1 * time.Second) - select { - case <-c.edsCbReceived: - t.Stop() - case <-t.C: - panic("EDS callback is not received after 1 second") - } - c.edsCb(u, err) -} - -func (c *fakeXDSClient) ReportLoad(server string, clusterName string, loadStore lrs.Store) (cancel func()) { - return func() {} -} - -func (c *fakeXDSClient) Close() {} - -// TestXdsFallbackResolvedAddrs verifies that the fallback balancer specified +// TestXDSFallbackResolvedAddrs verifies that the fallback balancer specified // in the provided lbconfig is initialized, and that it receives the addresses // pushed by the resolver. // @@ -295,18 +278,17 @@ func (c *fakeXDSClient) Close() {} // unique address to the list of addresses it uses to create the SubConn. // * We also have a fake ClientConn which verifies that it receives the // expected address list. -func (s) TestXdsFallbackResolvedAddrs(t *testing.T) { +func (s) TestXDSFallbackResolvedAddrs(t *testing.T) { startupTimeout = 500 * time.Millisecond defer func() { startupTimeout = defaultTimeout }() builder := balancer.Get(edsName) cc := newTestClientConn() - b := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}) - lb, ok := b.(*edsBalancer) + edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer) if !ok { - t.Fatalf("builder.Build() returned a balancer of type %T, want *edsBalancer", b) + t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) } - defer lb.Close() + defer edsB.Close() tests := []struct { resolvedAddrs []resolver.Address @@ -322,454 +304,352 @@ func (s) TestXdsFallbackResolvedAddrs(t *testing.T) { }, } for _, test := range tests { - lb.UpdateClientConnState(balancer.ClientConnState{ + edsB.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: test.resolvedAddrs}, BalancerConfig: testLBConfigFooBar, }) - - select { - case gotAddrs := <-cc.newSubConns: - if !reflect.DeepEqual(gotAddrs, test.wantAddrs) { - t.Fatalf("got new subconn address %v, want %v", gotAddrs, test.wantAddrs) - } - case <-time.After(2 * time.Second): - t.Fatal("timeout when getting new subconn result") + if err := cc.waitForNewSubConns(test.wantAddrs); err != nil { + t.Fatal(err) } } } -func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) { +// waitForNewXDSClientWithEDSWatch makes sure that a new xdsClient is created +// with the provided name. It also make sure that the newly created client +// registers an eds watcher. +func waitForNewXDSClientWithEDSWatch(t *testing.T, ch *testutils.Channel, wantName string) *fakexds.Client { + t.Helper() + + val, err := ch.Receive() + if err != nil { + t.Fatalf("error when waiting for a new xds client: %v", err) + return nil + } + xdsC := val.(*fakexds.Client) + if xdsC.Name() != wantName { + t.Fatalf("xdsClient created to balancer: %v, want %v", xdsC.Name(), wantName) + return nil + } + _, err = xdsC.WaitForWatchEDS() + if err != nil { + t.Fatalf("xdsClient.WatchEDS failed with error: %v", err) + return nil + } + return xdsC +} + +// waitForNewEDSLB makes sure that a new edsLB is created by the top-level +// edsBalancer. +func waitForNewEDSLB(t *testing.T, ch *testutils.Channel) *fakeEDSBalancer { + t.Helper() + + val, err := ch.Receive() + if err != nil { + t.Fatalf("error when waiting for a new edsLB: %v", err) + return nil + } + return val.(*fakeEDSBalancer) +} + +// setup overrides the functions which are used to create the xdsClient and the +// edsLB, creates fake version of them and makes them available on the provided +// channels. The returned cancel function should be called by the test for +// cleanup. +func setup(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() { + origNewEDSBalancer := newEDSBalancer + newEDSBalancer = func(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface { + edsLB := newFakeEDSBalancer(cc, loadStore) + defer func() { edsLBCh.Send(edsLB) }() + return edsLB + } + + origXdsClientNew := xdsclientNew + xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) { + xdsC := fakexds.NewClientWithName(opts.Config.BalancerName) + defer func() { xdsClientCh.Send(xdsC) }() + return xdsC, nil + } + return func() { + newEDSBalancer = origNewEDSBalancer + xdsclientNew = origXdsClientNew + } +} + +// setupForFallback performs everything that setup does and in addition +// overrides the fallback startupTimeout to a small value to trigger fallback +// in tests. +func setupForFallback(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() { + cancel := setup(edsLBCh, xdsClientCh) startupTimeout = 500 * time.Millisecond - originalNewEDSBalancer := newEDSBalancer - newEDSBalancer = newFakeEDSBalancer - defer func() { + return func() { + cancel() startupTimeout = defaultTimeout - newEDSBalancer = originalNewEDSBalancer - }() + } +} + +// TestXDSConfigBalancerNameUpdate verifies different scenarios where the +// balancer name in the lbConfig is updated. +// +// The test does the following: +// * Builds a new xds balancer. +// * Since there is no xDS server to respond to requests from the xds client +// (created as part of the xds balancer), we expect the fallback policy to +// kick in. +// * Repeatedly pushes new ClientConnState which specifies different +// balancerName in the lbConfig. We expect xdsClient objects to created +// whenever the balancerName changes. We also expect a new edsLB to created +// the first time the client receives an edsUpdate. +func (s) TestXDSConfigBalancerNameUpdate(t *testing.T) { + edsLBCh := testutils.NewChannel() + xdsClientCh := testutils.NewChannel() + cancel := setupForFallback(edsLBCh, xdsClientCh) + defer cancel() builder := balancer.Get(edsName) cc := newTestClientConn() - lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer) + edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) if !ok { - t.Fatalf("unable to type assert to *edsBalancer") + t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) } - defer lb.Close() + defer edsB.Close() + addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} - lb.UpdateClientConnState(balancer.ClientConnState{ + edsB.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: addrs}, BalancerConfig: testLBConfigFooBar, }) - // verify fallback takes over - select { - case nsc := <-cc.newSubConns: - if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) { - t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA)) - } - case <-time.After(2 * time.Second): - t.Fatalf("timeout when getting new subconn result") + waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar) + // Verify that fallbackLB (fakeBalancerA) takes over, since the xdsClient + // receives no edsUpdate. + if err := cc.waitForNewSubConns(append(addrs, specialAddrForBalancerA)); err != nil { + t.Fatal(err) } - var cleanups []func() - defer func() { - for _, cleanup := range cleanups { - cleanup() - } - }() - // In the first iteration, an eds balancer takes over fallback balancer - // In the second iteration, a new xds client takes over previous one. for i := 0; i < 2; i++ { - td, cleanup := fakexds.StartServer(t) - cleanups = append(cleanups, cleanup) - workingLBConfig := &XDSConfig{ - BalancerName: td.Address, + balancerName := fmt.Sprintf("balancer-%d", i) + edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: addrs}, + BalancerConfig: &XDSConfig{ + BalancerName: balancerName, + ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, + FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, + EDSServiceName: testEDSClusterName, + }, + }) + + xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, balancerName) + xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil) + + // In the first iteration, an edsLB takes over from the fallbackLB. In the + // second iteration, a new xds client is created, but the same edsLB is used. + if i == 0 { + if _, err := edsLBCh.Receive(); err != nil { + t.Fatalf("edsBalancer did not create edsLB after receiveing EDS update: %v, %d", err, i) + } + } else { + if _, err := edsLBCh.Receive(); err == nil { + t.Fatal("edsBalancer created new edsLB when it was not expected to") + } + } + } +} + +// TestXDSConnfigChildPolicyUpdate verifies scenarios where the childPolicy +// section of the lbConfig is updated. +// +// The test does the following: +// * Builds a new xds balancer. +// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerA. +// Verifies that a new xdsClient is created. It then pushes a new edsUpdate +// through the fakexds client. Verifies that a new edsLB is created and it +// receives the expected childPolicy. +// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerB. +// This time around, we expect no new xdsClient or edsLB to be created. +// Instead, we expect the existing edsLB to receive the new child policy. +func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) { + edsLBCh := testutils.NewChannel() + xdsClientCh := testutils.NewChannel() + cancel := setup(edsLBCh, xdsClientCh) + defer cancel() + + builder := balancer.Get(edsName) + cc := newTestClientConn() + edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer) + if !ok { + t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) + } + defer edsB.Close() + + edsB.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &XDSConfig{ + BalancerName: testBalancerNameFooBar, + ChildPolicy: &loadBalancingConfig{ + Name: fakeBalancerA, + Config: json.RawMessage("{}"), + }, + EDSServiceName: testEDSClusterName, + }, + }) + xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar) + xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil) + edsLB := waitForNewEDSLB(t, edsLBCh) + edsLB.waitForChildPolicy(&loadBalancingConfig{ + Name: string(fakeBalancerA), + Config: json.RawMessage(`{}`), + }) + + edsB.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &XDSConfig{ + BalancerName: testBalancerNameFooBar, + ChildPolicy: &loadBalancingConfig{ + Name: fakeBalancerB, + Config: json.RawMessage("{}"), + }, + EDSServiceName: testEDSClusterName, + }, + }) + edsLB.waitForChildPolicy(&loadBalancingConfig{ + Name: string(fakeBalancerA), + Config: json.RawMessage(`{}`), + }) +} + +// TestXDSConfigFallBackUpdate verifies different scenarios where the fallback +// config part of the lbConfig is updated. +// +// The test does the following: +// * Builds a top-level edsBalancer +// * Fakes the xdsClient and the underlying edsLB implementations. +// * Sends a ClientConn update to the edsBalancer with a bogus balancerName. +// This will get the balancer into fallback monitoring, but since the +// startupTimeout package variable is not overridden to a small value, fallback +// will not kick-in as yet. +// * Sends another ClientConn update with fallback addresses. Still fallback +// would not have kicked in because the startupTimeout hasn't expired. +// * Sends an EDSUpdate through the fakexds.Client object. This will trigger +// the creation of an edsLB object. This is verified. +// * Trigger fallback by directly calling the loseContact method on the +// top-level edsBalancer. This should instantiate the fallbackLB and should +// send the appropriate subConns. +// * Update the fallback policy to specify and different fallback LB and make +// sure the new LB receives appropriate subConns. +func (s) TestXDSConfigFallBackUpdate(t *testing.T) { + edsLBCh := testutils.NewChannel() + xdsClientCh := testutils.NewChannel() + cancel := setup(edsLBCh, xdsClientCh) + defer cancel() + + builder := balancer.Get(edsName) + cc := newTestClientConn() + edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) + if !ok { + t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) + } + defer edsB.Close() + + bogusBalancerName := "wrong-balancer-name" + edsB.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &XDSConfig{ + BalancerName: bogusBalancerName, + FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, + }, + }) + xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, bogusBalancerName) + + addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} + edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: addrs}, + BalancerConfig: &XDSConfig{ + BalancerName: bogusBalancerName, + FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerB}, + }, + }) + + xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil) + if _, err := edsLBCh.Receive(); err != nil { + t.Fatalf("edsBalancer did not create edsLB after receiveing EDS update: %v", err) + } + + // Call loseContact explicitly, error in EDS callback is not handled. + // Eventually, this should call EDS ballback with an error that indicates + // "lost contact". + edsB.loseContact() + + // Verify that fallback (fakeBalancerB) takes over. + if err := cc.waitForNewSubConns(append(addrs, specialAddrForBalancerB)); err != nil { + t.Fatal(err) + } + + edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: addrs}, + BalancerConfig: &XDSConfig{ + BalancerName: bogusBalancerName, + FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, + }, + }) + + // Verify that fallbackLB (fakeBalancerA) takes over. + if err := cc.waitForNewSubConns(append(addrs, specialAddrForBalancerA)); err != nil { + t.Fatal(err) + } +} + +// TestXDSSubConnStateChange verifies if the top-level edsBalancer passes on +// the subConnStateChange to appropriate child balancers (it tests for edsLB +// and a fallbackLB). +func (s) TestXDSSubConnStateChange(t *testing.T) { + edsLBCh := testutils.NewChannel() + xdsClientCh := testutils.NewChannel() + cancel := setup(edsLBCh, xdsClientCh) + defer cancel() + + builder := balancer.Get(edsName) + cc := newTestClientConn() + edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) + if !ok { + t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) + } + defer edsB.Close() + + addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} + edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: addrs}, + BalancerConfig: &XDSConfig{ + BalancerName: testBalancerNameFooBar, ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, EDSServiceName: testEDSClusterName, - } - lb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: addrs}, - BalancerConfig: workingLBConfig, - }) - td.ResponseChan <- &fakexds.Response{Resp: testEDSResp} - - var j int - for j = 0; j < 10; j++ { - if edsLB := getLatestEdsBalancer(); edsLB != nil { // edsLB won't change between the two iterations - select { - case gotEDS := <-edsLB.edsChan: - want, err := xdsclient.ParseEDSRespProto(testClusterLoadAssignment) - if err != nil { - t.Fatalf("parsing wanted EDS response failed: %v", err) - } - if !cmp.Equal(gotEDS, want) { - t.Fatalf("edsBalancer got eds: %v, want %v", gotEDS, testClusterLoadAssignment) - } - case <-time.After(time.Second): - t.Fatal("haven't got EDS update after 1s") - } - break - } - time.Sleep(100 * time.Millisecond) - } - if j == 10 { - t.Fatal("edsBalancer instance has not been created or updated after 1s") - } - } -} - -// switch child policy, lb stays the same -func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) { - originalNewEDSBalancer := newEDSBalancer - newEDSBalancer = newFakeEDSBalancer - defer func() { - newEDSBalancer = originalNewEDSBalancer - }() - - builder := balancer.Get(edsName) - cc := newTestClientConn() - lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer) - if !ok { - t.Fatalf("unable to type assert to *edsBalancer") - } - defer lb.Close() - - var cleanups []func() - defer func() { - for _, cleanup := range cleanups { - cleanup() - } - }() - for _, test := range []struct { - cfg *XDSConfig - responseToSend *xdspb.DiscoveryResponse - expectedChildPolicy *loadBalancingConfig - }{ - { - cfg: &XDSConfig{ - ChildPolicy: &loadBalancingConfig{ - Name: fakeBalancerA, - Config: json.RawMessage("{}"), - }, - EDSServiceName: testEDSClusterName, - }, - responseToSend: testEDSResp, - expectedChildPolicy: &loadBalancingConfig{ - Name: string(fakeBalancerA), - Config: json.RawMessage(`{}`), - }, }, - { - cfg: &XDSConfig{ - ChildPolicy: &loadBalancingConfig{ - Name: fakeBalancerB, - Config: json.RawMessage("{}"), - }, - EDSServiceName: testEDSClusterName, - }, - expectedChildPolicy: &loadBalancingConfig{ - Name: string(fakeBalancerB), - Config: json.RawMessage(`{}`), - }, - }, - } { - td, cleanup := fakexds.StartServer(t) - cleanups = append(cleanups, cleanup) - test.cfg.BalancerName = td.Address - - lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: test.cfg}) - if test.responseToSend != nil { - td.ResponseChan <- &fakexds.Response{Resp: test.responseToSend} - } - var i int - for i = 0; i < 10; i++ { - if edsLB := getLatestEdsBalancer(); edsLB != nil { - select { - case childPolicy := <-edsLB.childPolicy: - if !reflect.DeepEqual(childPolicy, test.expectedChildPolicy) { - t.Fatalf("got childPolicy %v, want %v", childPolicy, test.expectedChildPolicy) - } - case <-time.After(time.Second): - t.Fatal("haven't got policy update after 1s") - } - break - } - time.Sleep(100 * time.Millisecond) - } - if i == 10 { - t.Fatal("edsBalancer instance has not been created or updated after 1s") - } - } -} - -// not in fallback mode, overwrite fallback info. -// in fallback mode, update config or switch balancer. -func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) { - originalNewEDSBalancer := newEDSBalancer - newEDSBalancer = newFakeEDSBalancer - defer func() { - newEDSBalancer = originalNewEDSBalancer - }() - - testXDSClient := newFakeXDSClient() - originalxdsclientNew := xdsclientNew - xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) { - return testXDSClient, nil - } - defer func() { - xdsclientNew = originalxdsclientNew - }() - - builder := balancer.Get(edsName) - cc := newTestClientConn() - lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) - if !ok { - t.Fatalf("unable to type assert to *edsBalancer") - } - defer lb.Close() - - cfg := XDSConfig{ - BalancerName: "wrong-balancer-name", - ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, - FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, - } - lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: &cfg}) - - addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} - cfg2 := cfg - cfg2.FallBackPolicy = &loadBalancingConfig{Name: fakeBalancerB} - lb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: addrs}, - BalancerConfig: &cfg2, }) - // Callback with an EDS update, the balancer will build a EDS balancer, not - // a fallback. - testXDSClient.callEDSCallback(xdsclient.ParseEDSRespProtoForTesting(testClusterLoadAssignment), nil) + xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar) + xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil) + edsLB := waitForNewEDSLB(t, edsLBCh) - var i int - for i = 0; i < 10; i++ { - if edsLB := getLatestEdsBalancer(); edsLB != nil { - break - } - time.Sleep(100 * time.Millisecond) - } - if i == 10 { - t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s") - } + fsc := &fakeSubConn{} + state := connectivity.Ready + edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state}) + edsLB.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state}) - // Call loseContact explicitly, error in EDS callback is not handled. - // Eventually, this should call EDS ballback with an error that indicates - // "lost contact". - lb.loseContact() - - // verify fallback balancer B takes over - select { - case nsc := <-cc.newSubConns: - if !reflect.DeepEqual(append(addrs, specialAddrForBalancerB), nsc) { - t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerB)) - } - case <-time.After(5 * time.Second): - t.Fatalf("timeout when getting new subconn result") - } - - cfg3 := cfg - cfg3.FallBackPolicy = &loadBalancingConfig{Name: fakeBalancerA} - lb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: addrs}, - BalancerConfig: &cfg3, - }) - - // verify fallback balancer A takes over - select { - case nsc := <-cc.newSubConns: - if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) { - t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA)) - } - case <-time.After(2 * time.Second): - t.Fatalf("timeout when getting new subconn result") - } -} - -func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) { - originalNewEDSBalancer := newEDSBalancer - newEDSBalancer = newFakeEDSBalancer - defer func() { - newEDSBalancer = originalNewEDSBalancer - }() - - testXDSClient := newFakeXDSClient() - originalxdsclientNew := xdsclientNew - xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) { - return testXDSClient, nil - } - defer func() { - xdsclientNew = originalxdsclientNew - }() - - builder := balancer.Get(edsName) - cc := newTestClientConn() - lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) - if !ok { - t.Fatalf("unable to type assert to *edsBalancer") - } - defer lb.Close() - - cfg := &XDSConfig{ - BalancerName: "wrong-balancer-name", - ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, - FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, - EDSServiceName: testEDSClusterName, - } - lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg}) - - // Callback with an EDS update, the balancer will build a EDS balancer, not - // a fallback. - testXDSClient.callEDSCallback(xdsclient.ParseEDSRespProtoForTesting(testClusterLoadAssignment), nil) - - expectedScStateChange := &scStateChange{ - sc: &fakeSubConn{}, - state: connectivity.Ready, - } - - var i int - for i = 0; i < 10; i++ { - if edsLB := getLatestEdsBalancer(); edsLB != nil { - lb.UpdateSubConnState(expectedScStateChange.sc, balancer.SubConnState{ConnectivityState: expectedScStateChange.state}) - select { - case scsc := <-edsLB.subconnStateChange: - if !reflect.DeepEqual(scsc, expectedScStateChange) { - t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange) - } - case <-time.After(time.Second): - t.Fatal("haven't got subconn state change after 1s") - } - break - } - time.Sleep(100 * time.Millisecond) - } - if i == 10 { - t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s") - } - - // lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record - // to make sure there's a new one created and get the pointer to it. + // lbABuilder maintains a pointer to the last balancerA that it created. We + // need to clear that to make sure a new one is created when we attempt to + // fallback in the next line. lbABuilder.clearLastBalancer() // Call loseContact explicitly, error in EDS callback is not handled. // Eventually, this should call EDS ballback with an error that indicates // "lost contact". - lb.loseContact() - - // switch to fallback - // fallback balancer A takes over - for i = 0; i < 10; i++ { - if fblb := lbABuilder.getLastBalancer(); fblb != nil { - lb.UpdateSubConnState(expectedScStateChange.sc, balancer.SubConnState{ConnectivityState: expectedScStateChange.state}) - select { - case scsc := <-fblb.subconnStateChange: - if !reflect.DeepEqual(scsc, expectedScStateChange) { - t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange) - } - case <-time.After(time.Second): - t.Fatal("haven't got subconn state change after 1s") - } - break - } - time.Sleep(100 * time.Millisecond) + edsB.loseContact() + // Verify that fallback (fakeBalancerA) takes over. + if err := cc.waitForNewSubConns(append(addrs, specialAddrForBalancerA)); err != nil { + t.Fatal(err) } - if i == 10 { - t.Fatal("balancerA instance has not been created after 1s") - } -} - -func (s) TestXdsBalancerFallBackSignalFromEdsBalancer(t *testing.T) { - originalNewEDSBalancer := newEDSBalancer - newEDSBalancer = newFakeEDSBalancer - defer func() { - newEDSBalancer = originalNewEDSBalancer - }() - - testXDSClient := newFakeXDSClient() - originalxdsclientNew := xdsclientNew - xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) { - return testXDSClient, nil - } - defer func() { - xdsclientNew = originalxdsclientNew - }() - - builder := balancer.Get(edsName) - cc := newTestClientConn() - lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) - if !ok { - t.Fatalf("unable to type assert to *edsBalancer") - } - defer lb.Close() - - cfg := &XDSConfig{ - BalancerName: "wrong-balancer-name", - ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, - FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, - } - lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg}) - - // Callback with an EDS update, the balancer will build a EDS balancer, not - // a fallback. - testXDSClient.callEDSCallback(xdsclient.ParseEDSRespProtoForTesting(testClusterLoadAssignment), nil) - - expectedScStateChange := &scStateChange{ - sc: &fakeSubConn{}, - state: connectivity.Ready, - } - - var i int - for i = 0; i < 10; i++ { - if edsLB := getLatestEdsBalancer(); edsLB != nil { - lb.UpdateSubConnState(expectedScStateChange.sc, balancer.SubConnState{ConnectivityState: expectedScStateChange.state}) - select { - case scsc := <-edsLB.subconnStateChange: - if !reflect.DeepEqual(scsc, expectedScStateChange) { - t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange) - } - case <-time.After(time.Second): - t.Fatal("haven't got subconn state change after 1s") - } - break - } - time.Sleep(100 * time.Millisecond) - } - if i == 10 { - t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s") - } - - // lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record - // to make sure there's a new one created and get the pointer to it. - lbABuilder.clearLastBalancer() - // Call loseContact explicitly, error in EDS callback is not handled. - // Eventually, this should call EDS ballback with an error that indicates - // "lost contact". - lb.loseContact() - - // switch to fallback - // fallback balancer A takes over - for i = 0; i < 10; i++ { - if fblb := lbABuilder.getLastBalancer(); fblb != nil { - lb.UpdateSubConnState(expectedScStateChange.sc, balancer.SubConnState{ConnectivityState: expectedScStateChange.state}) - select { - case scsc := <-fblb.subconnStateChange: - if !reflect.DeepEqual(scsc, expectedScStateChange) { - t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange) - } - case <-time.After(time.Second): - t.Fatal("haven't got subconn state change after 1s") - } - break - } - time.Sleep(100 * time.Millisecond) - } - if i == 10 { - t.Fatal("balancerA instance has not been created after 1s") + fblb := lbABuilder.getLastBalancer() + if fblb == nil { + t.Fatal("expected fallback balancerA to be built on fallback") } + edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state}) + fblb.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state}) } func TestXdsBalancerConfigParsing(t *testing.T) { diff --git a/xds/internal/testutils/channel.go b/xds/internal/testutils/channel.go index f0b04d0c..a2b34afa 100644 --- a/xds/internal/testutils/channel.go +++ b/xds/internal/testutils/channel.go @@ -66,5 +66,10 @@ func (cwt *Channel) Receive() (interface{}, error) { // NewChannel returns a new Channel. func NewChannel() *Channel { - return &Channel{ch: make(chan interface{}, DefaultChanBufferSize)} + return NewChannelWithSize(DefaultChanBufferSize) +} + +// NewChannelWithSize returns a new Channel with a buffer of bufSize. +func NewChannelWithSize(bufSize int) *Channel { + return &Channel{ch: make(chan interface{}, bufSize)} } diff --git a/xds/internal/testutils/fakexds/client.go b/xds/internal/testutils/fakexds/client.go index cbcc6bc6..d0eb97e9 100644 --- a/xds/internal/testutils/fakexds/client.go +++ b/xds/internal/testutils/fakexds/client.go @@ -21,6 +21,8 @@ package fakexds import ( + "sync" + "google.golang.org/grpc/xds/internal/balancer/lrs" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/testutils" @@ -29,18 +31,24 @@ import ( // Client is a fake implementation of an xds client. It exposes a bunch of // channels to signal the occurrence of various events. type Client struct { - serviceCb func(xdsclient.ServiceUpdate, error) - edsCb func(*xdsclient.EDSUpdate, error) + name string suWatchCh *testutils.Channel edsWatchCh *testutils.Channel suCancelCh *testutils.Channel edsCancelCh *testutils.Channel loadReportCh *testutils.Channel closeCh *testutils.Channel + + mu sync.Mutex + serviceCb func(xdsclient.ServiceUpdate, error) + edsCb func(*xdsclient.EDSUpdate, error) } // WatchService registers a LDS/RDS watch. func (xdsC *Client) WatchService(target string, callback func(xdsclient.ServiceUpdate, error)) func() { + xdsC.mu.Lock() + defer xdsC.mu.Unlock() + xdsC.serviceCb = callback xdsC.suWatchCh.Send(target) return func() { @@ -57,11 +65,17 @@ func (xdsC *Client) WaitForWatchService() (string, error) { // InvokeWatchServiceCallback invokes the registered service watch callback. func (xdsC *Client) InvokeWatchServiceCallback(cluster string, err error) { + xdsC.mu.Lock() + defer xdsC.mu.Unlock() + xdsC.serviceCb(xdsclient.ServiceUpdate{Cluster: cluster}, err) } // WatchEDS registers an EDS watch for provided clusterName. func (xdsC *Client) WatchEDS(clusterName string, callback func(*xdsclient.EDSUpdate, error)) (cancel func()) { + xdsC.mu.Lock() + defer xdsC.mu.Unlock() + xdsC.edsCb = callback xdsC.edsWatchCh.Send(clusterName) return func() { @@ -78,6 +92,9 @@ func (xdsC *Client) WaitForWatchEDS() (string, error) { // InvokeWatchEDSCallback invokes the registered edsWatch callback. func (xdsC *Client) InvokeWatchEDSCallback(update *xdsclient.EDSUpdate, err error) { + xdsC.mu.Lock() + defer xdsC.mu.Unlock() + xdsC.edsCb(update, err) } @@ -107,9 +124,22 @@ func (xdsC *Client) Close() { xdsC.closeCh.Send(nil) } +// Name returns the name of the xds client. +func (xdsC *Client) Name() string { + return xdsC.name +} + // NewClient returns a new fake xds client. func NewClient() *Client { + return NewClientWithName("") +} + +// NewClientWithName returns a new fake xds client with the provided name. This +// is used in cases where multiple clients are created in the tests and we need +// to make sure the client is created for the expected balancer name. +func NewClientWithName(name string) *Client { return &Client{ + name: name, suWatchCh: testutils.NewChannel(), edsWatchCh: testutils.NewChannel(), suCancelCh: testutils.NewChannel(),