xds: Improve balancere/xds_test.go. (#3256)

This commit is contained in:
Easwar Swaminathan
2019-12-17 11:42:43 -08:00
committed by GitHub
parent f7b39d80aa
commit d01454da4a
3 changed files with 415 additions and 500 deletions

View File

@ -21,12 +21,12 @@ package balancer
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt"
"reflect" "reflect"
"sync" "sync"
"testing" "testing"
"time" "time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/jsonpb"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers" wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
@ -42,7 +42,8 @@ import (
"google.golang.org/grpc/xds/internal/balancer/lrs" "google.golang.org/grpc/xds/internal/balancer/lrs"
xdsclient "google.golang.org/grpc/xds/internal/client" xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap" "google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/client/fakexds" "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakexds"
) )
var lbABuilder = &balancerABuilder{} var lbABuilder = &balancerABuilder{}
@ -87,10 +88,6 @@ var (
specialAddrForBalancerA = resolver.Address{Addr: "this.is.balancer.A"} specialAddrForBalancerA = resolver.Address{Addr: "this.is.balancer.A"}
specialAddrForBalancerB = resolver.Address{Addr: "this.is.balancer.B"} specialAddrForBalancerB = resolver.Address{Addr: "this.is.balancer.B"}
// mu protects the access of latestFakeEdsBalancer
mu sync.Mutex
latestFakeEdsBalancer *fakeEDSBalancer
) )
type balancerABuilder struct { type balancerABuilder struct {
@ -100,7 +97,7 @@ type balancerABuilder struct {
func (b *balancerABuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { func (b *balancerABuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
b.mu.Lock() b.mu.Lock()
b.lastBalancer = &balancerA{cc: cc, subconnStateChange: make(chan *scStateChange, 10)} b.lastBalancer = &balancerA{cc: cc, subconnStateChange: testutils.NewChannelWithSize(10)}
b.mu.Unlock() b.mu.Unlock()
return b.lastBalancer return b.lastBalancer
} }
@ -138,11 +135,11 @@ func (*balancerBBuilder) Name() string {
// the test to inspect. // the test to inspect.
type balancerA struct { type balancerA struct {
cc balancer.ClientConn cc balancer.ClientConn
subconnStateChange chan *scStateChange subconnStateChange *testutils.Channel
} }
func (b *balancerA) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { func (b *balancerA) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
b.subconnStateChange <- &scStateChange{sc: sc, state: state} b.subconnStateChange.Send(&scStateChange{sc: sc, state: state})
} }
func (b *balancerA) HandleResolvedAddrs(addrs []resolver.Address, err error) { func (b *balancerA) HandleResolvedAddrs(addrs []resolver.Address, err error) {
@ -151,6 +148,10 @@ func (b *balancerA) HandleResolvedAddrs(addrs []resolver.Address, err error) {
func (b *balancerA) Close() {} func (b *balancerA) Close() {}
func (b *balancerA) waitForSubConnStateChange(wantState *scStateChange) error {
return waitForSubConnStateChange(b.subconnStateChange, wantState)
}
// A fake balancer implementation which appends a unique address to the list of // A fake balancer implementation which appends a unique address to the list of
// resolved addresses received before attempting to create a SubConn. // resolved addresses received before attempting to create a SubConn.
type balancerB struct { type balancerB struct {
@ -167,18 +168,30 @@ func (balancerB) HandleSubConnStateChange(sc balancer.SubConn, state connectivit
func (balancerB) Close() {} func (balancerB) Close() {}
func newTestClientConn() *testClientConn { func newTestClientConn() *testClientConn {
return &testClientConn{newSubConns: make(chan []resolver.Address, 10)} return &testClientConn{newSubConns: testutils.NewChannelWithSize(10)}
} }
type testClientConn struct { type testClientConn struct {
newSubConns chan []resolver.Address newSubConns *testutils.Channel
} }
func (t *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { func (t *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
t.newSubConns <- addrs t.newSubConns.Send(addrs)
return nil, nil return nil, nil
} }
func (t *testClientConn) waitForNewSubConns(wantAddrs []resolver.Address) error {
val, err := t.newSubConns.Receive()
if err != nil {
return fmt.Errorf("error waiting for subconns: %v", err)
}
gotAddrs := val.([]resolver.Address)
if !reflect.DeepEqual(gotAddrs, wantAddrs) {
return fmt.Errorf("got subconn address %v, want %v", gotAddrs, wantAddrs)
}
return nil
}
func (testClientConn) RemoveSubConn(balancer.SubConn) {} func (testClientConn) RemoveSubConn(balancer.SubConn) {}
func (testClientConn) UpdateBalancerState(connectivity.State, balancer.Picker) {} func (testClientConn) UpdateBalancerState(connectivity.State, balancer.Picker) {}
func (testClientConn) UpdateState(balancer.State) {} func (testClientConn) UpdateState(balancer.State) {}
@ -192,53 +205,57 @@ type scStateChange struct {
type fakeEDSBalancer struct { type fakeEDSBalancer struct {
cc balancer.ClientConn cc balancer.ClientConn
edsChan chan *xdsclient.EDSUpdate childPolicy *testutils.Channel
childPolicy chan *loadBalancingConfig subconnStateChange *testutils.Channel
fallbackPolicy chan *loadBalancingConfig
subconnStateChange chan *scStateChange
loadStore lrs.Store loadStore lrs.Store
} }
func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
f.subconnStateChange <- &scStateChange{sc: sc, state: state} f.subconnStateChange.Send(&scStateChange{sc: sc, state: state})
}
func (f *fakeEDSBalancer) Close() {
mu.Lock()
defer mu.Unlock()
latestFakeEdsBalancer = nil
}
func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {
f.edsChan <- edsResp
} }
func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) { func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) {
f.childPolicy <- &loadBalancingConfig{ f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config})
Name: name,
Config: config,
} }
func (f *fakeEDSBalancer) Close() {}
func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {}
func (f *fakeEDSBalancer) waitForChildPolicy(wantPolicy *loadBalancingConfig) error {
val, err := f.childPolicy.Receive()
if err != nil {
return fmt.Errorf("error waiting for childPolicy: %v", err)
}
gotPolicy := val.(*loadBalancingConfig)
if !reflect.DeepEqual(gotPolicy, wantPolicy) {
return fmt.Errorf("got childPolicy %v, want %v", gotPolicy, wantPolicy)
}
return nil
}
func (f *fakeEDSBalancer) waitForSubConnStateChange(wantState *scStateChange) error {
return waitForSubConnStateChange(f.subconnStateChange, wantState)
}
func waitForSubConnStateChange(ch *testutils.Channel, wantState *scStateChange) error {
val, err := ch.Receive()
if err != nil {
return fmt.Errorf("error waiting for subconnStateChange: %v", err)
}
gotState := val.(*scStateChange)
if !reflect.DeepEqual(gotState, wantState) {
return fmt.Errorf("got subconnStateChange %v, want %v", gotState, wantState)
}
return nil
} }
func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface { func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface {
lb := &fakeEDSBalancer{ return &fakeEDSBalancer{
cc: cc, cc: cc,
edsChan: make(chan *xdsclient.EDSUpdate, 10), childPolicy: testutils.NewChannelWithSize(10),
childPolicy: make(chan *loadBalancingConfig, 10), subconnStateChange: testutils.NewChannelWithSize(10),
fallbackPolicy: make(chan *loadBalancingConfig, 10),
subconnStateChange: make(chan *scStateChange, 10),
loadStore: loadStore, loadStore: loadStore,
} }
mu.Lock()
latestFakeEdsBalancer = lb
mu.Unlock()
return lb
}
func getLatestEdsBalancer() *fakeEDSBalancer {
mu.Lock()
defer mu.Unlock()
return latestFakeEdsBalancer
} }
type fakeSubConn struct{} type fakeSubConn struct{}
@ -246,41 +263,7 @@ type fakeSubConn struct{}
func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") } func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") }
func (*fakeSubConn) Connect() { panic("implement me") } func (*fakeSubConn) Connect() { panic("implement me") }
type fakeXDSClient struct { // TestXDSFallbackResolvedAddrs verifies that the fallback balancer specified
edsCbReceived chan struct{} // Will be closed when WatchEDS is called.
edsCb func(*xdsclient.EDSUpdate, error)
}
func newFakeXDSClient() *fakeXDSClient {
return &fakeXDSClient{edsCbReceived: make(chan struct{})}
}
func (c *fakeXDSClient) WatchEDS(clusterName string, edsCb func(*xdsclient.EDSUpdate, error)) (cancel func()) {
c.edsCb = edsCb
// WatchEDS is expected to be only called once in the test. If a test needs
// to call it multiple times, this will panic.
close(c.edsCbReceived)
return func() {}
}
func (c *fakeXDSClient) callEDSCallback(u *xdsclient.EDSUpdate, err error) {
t := time.NewTimer(1 * time.Second)
select {
case <-c.edsCbReceived:
t.Stop()
case <-t.C:
panic("EDS callback is not received after 1 second")
}
c.edsCb(u, err)
}
func (c *fakeXDSClient) ReportLoad(server string, clusterName string, loadStore lrs.Store) (cancel func()) {
return func() {}
}
func (c *fakeXDSClient) Close() {}
// TestXdsFallbackResolvedAddrs verifies that the fallback balancer specified
// in the provided lbconfig is initialized, and that it receives the addresses // in the provided lbconfig is initialized, and that it receives the addresses
// pushed by the resolver. // pushed by the resolver.
// //
@ -295,18 +278,17 @@ func (c *fakeXDSClient) Close() {}
// unique address to the list of addresses it uses to create the SubConn. // unique address to the list of addresses it uses to create the SubConn.
// * We also have a fake ClientConn which verifies that it receives the // * We also have a fake ClientConn which verifies that it receives the
// expected address list. // expected address list.
func (s) TestXdsFallbackResolvedAddrs(t *testing.T) { func (s) TestXDSFallbackResolvedAddrs(t *testing.T) {
startupTimeout = 500 * time.Millisecond startupTimeout = 500 * time.Millisecond
defer func() { startupTimeout = defaultTimeout }() defer func() { startupTimeout = defaultTimeout }()
builder := balancer.Get(edsName) builder := balancer.Get(edsName)
cc := newTestClientConn() cc := newTestClientConn()
b := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}) edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer)
lb, ok := b.(*edsBalancer)
if !ok { if !ok {
t.Fatalf("builder.Build() returned a balancer of type %T, want *edsBalancer", b) t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
} }
defer lb.Close() defer edsB.Close()
tests := []struct { tests := []struct {
resolvedAddrs []resolver.Address resolvedAddrs []resolver.Address
@ -322,454 +304,352 @@ func (s) TestXdsFallbackResolvedAddrs(t *testing.T) {
}, },
} }
for _, test := range tests { for _, test := range tests {
lb.UpdateClientConnState(balancer.ClientConnState{ edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: test.resolvedAddrs}, ResolverState: resolver.State{Addresses: test.resolvedAddrs},
BalancerConfig: testLBConfigFooBar, BalancerConfig: testLBConfigFooBar,
}) })
if err := cc.waitForNewSubConns(test.wantAddrs); err != nil {
select { t.Fatal(err)
case gotAddrs := <-cc.newSubConns:
if !reflect.DeepEqual(gotAddrs, test.wantAddrs) {
t.Fatalf("got new subconn address %v, want %v", gotAddrs, test.wantAddrs)
}
case <-time.After(2 * time.Second):
t.Fatal("timeout when getting new subconn result")
} }
} }
} }
func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) { // waitForNewXDSClientWithEDSWatch makes sure that a new xdsClient is created
// with the provided name. It also make sure that the newly created client
// registers an eds watcher.
func waitForNewXDSClientWithEDSWatch(t *testing.T, ch *testutils.Channel, wantName string) *fakexds.Client {
t.Helper()
val, err := ch.Receive()
if err != nil {
t.Fatalf("error when waiting for a new xds client: %v", err)
return nil
}
xdsC := val.(*fakexds.Client)
if xdsC.Name() != wantName {
t.Fatalf("xdsClient created to balancer: %v, want %v", xdsC.Name(), wantName)
return nil
}
_, err = xdsC.WaitForWatchEDS()
if err != nil {
t.Fatalf("xdsClient.WatchEDS failed with error: %v", err)
return nil
}
return xdsC
}
// waitForNewEDSLB makes sure that a new edsLB is created by the top-level
// edsBalancer.
func waitForNewEDSLB(t *testing.T, ch *testutils.Channel) *fakeEDSBalancer {
t.Helper()
val, err := ch.Receive()
if err != nil {
t.Fatalf("error when waiting for a new edsLB: %v", err)
return nil
}
return val.(*fakeEDSBalancer)
}
// setup overrides the functions which are used to create the xdsClient and the
// edsLB, creates fake version of them and makes them available on the provided
// channels. The returned cancel function should be called by the test for
// cleanup.
func setup(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() {
origNewEDSBalancer := newEDSBalancer
newEDSBalancer = func(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface {
edsLB := newFakeEDSBalancer(cc, loadStore)
defer func() { edsLBCh.Send(edsLB) }()
return edsLB
}
origXdsClientNew := xdsclientNew
xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) {
xdsC := fakexds.NewClientWithName(opts.Config.BalancerName)
defer func() { xdsClientCh.Send(xdsC) }()
return xdsC, nil
}
return func() {
newEDSBalancer = origNewEDSBalancer
xdsclientNew = origXdsClientNew
}
}
// setupForFallback performs everything that setup does and in addition
// overrides the fallback startupTimeout to a small value to trigger fallback
// in tests.
func setupForFallback(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() {
cancel := setup(edsLBCh, xdsClientCh)
startupTimeout = 500 * time.Millisecond startupTimeout = 500 * time.Millisecond
originalNewEDSBalancer := newEDSBalancer return func() {
newEDSBalancer = newFakeEDSBalancer cancel()
defer func() {
startupTimeout = defaultTimeout startupTimeout = defaultTimeout
newEDSBalancer = originalNewEDSBalancer }
}() }
// TestXDSConfigBalancerNameUpdate verifies different scenarios where the
// balancer name in the lbConfig is updated.
//
// The test does the following:
// * Builds a new xds balancer.
// * Since there is no xDS server to respond to requests from the xds client
// (created as part of the xds balancer), we expect the fallback policy to
// kick in.
// * Repeatedly pushes new ClientConnState which specifies different
// balancerName in the lbConfig. We expect xdsClient objects to created
// whenever the balancerName changes. We also expect a new edsLB to created
// the first time the client receives an edsUpdate.
func (s) TestXDSConfigBalancerNameUpdate(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsClientCh := testutils.NewChannel()
cancel := setupForFallback(edsLBCh, xdsClientCh)
defer cancel()
builder := balancer.Get(edsName) builder := balancer.Get(edsName)
cc := newTestClientConn() cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer) edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
if !ok { if !ok {
t.Fatalf("unable to type assert to *edsBalancer") t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
} }
defer lb.Close() defer edsB.Close()
addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
lb.UpdateClientConnState(balancer.ClientConnState{ edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs}, ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: testLBConfigFooBar, BalancerConfig: testLBConfigFooBar,
}) })
// verify fallback takes over waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
select { // Verify that fallbackLB (fakeBalancerA) takes over, since the xdsClient
case nsc := <-cc.newSubConns: // receives no edsUpdate.
if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) { if err := cc.waitForNewSubConns(append(addrs, specialAddrForBalancerA)); err != nil {
t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA)) t.Fatal(err)
}
case <-time.After(2 * time.Second):
t.Fatalf("timeout when getting new subconn result")
} }
var cleanups []func()
defer func() {
for _, cleanup := range cleanups {
cleanup()
}
}()
// In the first iteration, an eds balancer takes over fallback balancer
// In the second iteration, a new xds client takes over previous one.
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
td, cleanup := fakexds.StartServer(t) balancerName := fmt.Sprintf("balancer-%d", i)
cleanups = append(cleanups, cleanup) edsB.UpdateClientConnState(balancer.ClientConnState{
workingLBConfig := &XDSConfig{ ResolverState: resolver.State{Addresses: addrs},
BalancerName: td.Address, BalancerConfig: &XDSConfig{
BalancerName: balancerName,
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
EDSServiceName: testEDSClusterName, EDSServiceName: testEDSClusterName,
} },
lb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: workingLBConfig,
}) })
td.ResponseChan <- &fakexds.Response{Resp: testEDSResp}
var j int xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, balancerName)
for j = 0; j < 10; j++ { xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
if edsLB := getLatestEdsBalancer(); edsLB != nil { // edsLB won't change between the two iterations
select { // In the first iteration, an edsLB takes over from the fallbackLB. In the
case gotEDS := <-edsLB.edsChan: // second iteration, a new xds client is created, but the same edsLB is used.
want, err := xdsclient.ParseEDSRespProto(testClusterLoadAssignment) if i == 0 {
if err != nil { if _, err := edsLBCh.Receive(); err != nil {
t.Fatalf("parsing wanted EDS response failed: %v", err) t.Fatalf("edsBalancer did not create edsLB after receiveing EDS update: %v, %d", err, i)
} }
if !cmp.Equal(gotEDS, want) { } else {
t.Fatalf("edsBalancer got eds: %v, want %v", gotEDS, testClusterLoadAssignment) if _, err := edsLBCh.Receive(); err == nil {
t.Fatal("edsBalancer created new edsLB when it was not expected to")
} }
case <-time.After(time.Second):
t.Fatal("haven't got EDS update after 1s")
}
break
}
time.Sleep(100 * time.Millisecond)
}
if j == 10 {
t.Fatal("edsBalancer instance has not been created or updated after 1s")
} }
} }
} }
// switch child policy, lb stays the same // TestXDSConnfigChildPolicyUpdate verifies scenarios where the childPolicy
func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) { // section of the lbConfig is updated.
originalNewEDSBalancer := newEDSBalancer //
newEDSBalancer = newFakeEDSBalancer // The test does the following:
defer func() { // * Builds a new xds balancer.
newEDSBalancer = originalNewEDSBalancer // * Pushes a new ClientConnState with a childPolicy set to fakeBalancerA.
}() // Verifies that a new xdsClient is created. It then pushes a new edsUpdate
// through the fakexds client. Verifies that a new edsLB is created and it
// receives the expected childPolicy.
// * Pushes a new ClientConnState with a childPolicy set to fakeBalancerB.
// This time around, we expect no new xdsClient or edsLB to be created.
// Instead, we expect the existing edsLB to receive the new child policy.
func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsClientCh := testutils.NewChannel()
cancel := setup(edsLBCh, xdsClientCh)
defer cancel()
builder := balancer.Get(edsName) builder := balancer.Get(edsName)
cc := newTestClientConn() cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer) edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer)
if !ok { if !ok {
t.Fatalf("unable to type assert to *edsBalancer") t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
} }
defer lb.Close() defer edsB.Close()
var cleanups []func() edsB.UpdateClientConnState(balancer.ClientConnState{
defer func() { BalancerConfig: &XDSConfig{
for _, cleanup := range cleanups { BalancerName: testBalancerNameFooBar,
cleanup()
}
}()
for _, test := range []struct {
cfg *XDSConfig
responseToSend *xdspb.DiscoveryResponse
expectedChildPolicy *loadBalancingConfig
}{
{
cfg: &XDSConfig{
ChildPolicy: &loadBalancingConfig{ ChildPolicy: &loadBalancingConfig{
Name: fakeBalancerA, Name: fakeBalancerA,
Config: json.RawMessage("{}"), Config: json.RawMessage("{}"),
}, },
EDSServiceName: testEDSClusterName, EDSServiceName: testEDSClusterName,
}, },
responseToSend: testEDSResp, })
expectedChildPolicy: &loadBalancingConfig{ xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
edsLB := waitForNewEDSLB(t, edsLBCh)
edsLB.waitForChildPolicy(&loadBalancingConfig{
Name: string(fakeBalancerA), Name: string(fakeBalancerA),
Config: json.RawMessage(`{}`), Config: json.RawMessage(`{}`),
}, })
},
{ edsB.UpdateClientConnState(balancer.ClientConnState{
cfg: &XDSConfig{ BalancerConfig: &XDSConfig{
BalancerName: testBalancerNameFooBar,
ChildPolicy: &loadBalancingConfig{ ChildPolicy: &loadBalancingConfig{
Name: fakeBalancerB, Name: fakeBalancerB,
Config: json.RawMessage("{}"), Config: json.RawMessage("{}"),
}, },
EDSServiceName: testEDSClusterName, EDSServiceName: testEDSClusterName,
}, },
expectedChildPolicy: &loadBalancingConfig{ })
Name: string(fakeBalancerB), edsLB.waitForChildPolicy(&loadBalancingConfig{
Name: string(fakeBalancerA),
Config: json.RawMessage(`{}`), Config: json.RawMessage(`{}`),
}, })
},
} {
td, cleanup := fakexds.StartServer(t)
cleanups = append(cleanups, cleanup)
test.cfg.BalancerName = td.Address
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: test.cfg})
if test.responseToSend != nil {
td.ResponseChan <- &fakexds.Response{Resp: test.responseToSend}
}
var i int
for i = 0; i < 10; i++ {
if edsLB := getLatestEdsBalancer(); edsLB != nil {
select {
case childPolicy := <-edsLB.childPolicy:
if !reflect.DeepEqual(childPolicy, test.expectedChildPolicy) {
t.Fatalf("got childPolicy %v, want %v", childPolicy, test.expectedChildPolicy)
}
case <-time.After(time.Second):
t.Fatal("haven't got policy update after 1s")
}
break
}
time.Sleep(100 * time.Millisecond)
}
if i == 10 {
t.Fatal("edsBalancer instance has not been created or updated after 1s")
}
}
} }
// not in fallback mode, overwrite fallback info. // TestXDSConfigFallBackUpdate verifies different scenarios where the fallback
// in fallback mode, update config or switch balancer. // config part of the lbConfig is updated.
func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) { //
originalNewEDSBalancer := newEDSBalancer // The test does the following:
newEDSBalancer = newFakeEDSBalancer // * Builds a top-level edsBalancer
defer func() { // * Fakes the xdsClient and the underlying edsLB implementations.
newEDSBalancer = originalNewEDSBalancer // * Sends a ClientConn update to the edsBalancer with a bogus balancerName.
}() // This will get the balancer into fallback monitoring, but since the
// startupTimeout package variable is not overridden to a small value, fallback
testXDSClient := newFakeXDSClient() // will not kick-in as yet.
originalxdsclientNew := xdsclientNew // * Sends another ClientConn update with fallback addresses. Still fallback
xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) { // would not have kicked in because the startupTimeout hasn't expired.
return testXDSClient, nil // * Sends an EDSUpdate through the fakexds.Client object. This will trigger
} // the creation of an edsLB object. This is verified.
defer func() { // * Trigger fallback by directly calling the loseContact method on the
xdsclientNew = originalxdsclientNew // top-level edsBalancer. This should instantiate the fallbackLB and should
}() // send the appropriate subConns.
// * Update the fallback policy to specify and different fallback LB and make
// sure the new LB receives appropriate subConns.
func (s) TestXDSConfigFallBackUpdate(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsClientCh := testutils.NewChannel()
cancel := setup(edsLBCh, xdsClientCh)
defer cancel()
builder := balancer.Get(edsName) builder := balancer.Get(edsName)
cc := newTestClientConn() cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
if !ok { if !ok {
t.Fatalf("unable to type assert to *edsBalancer") t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
} }
defer lb.Close() defer edsB.Close()
cfg := XDSConfig{ bogusBalancerName := "wrong-balancer-name"
BalancerName: "wrong-balancer-name", edsB.UpdateClientConnState(balancer.ClientConnState{
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, BalancerConfig: &XDSConfig{
BalancerName: bogusBalancerName,
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
} },
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: &cfg}) })
xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, bogusBalancerName)
addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
cfg2 := cfg edsB.UpdateClientConnState(balancer.ClientConnState{
cfg2.FallBackPolicy = &loadBalancingConfig{Name: fakeBalancerB}
lb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs}, ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: &cfg2, BalancerConfig: &XDSConfig{
BalancerName: bogusBalancerName,
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerB},
},
}) })
// Callback with an EDS update, the balancer will build a EDS balancer, not xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
// a fallback. if _, err := edsLBCh.Receive(); err != nil {
testXDSClient.callEDSCallback(xdsclient.ParseEDSRespProtoForTesting(testClusterLoadAssignment), nil) t.Fatalf("edsBalancer did not create edsLB after receiveing EDS update: %v", err)
var i int
for i = 0; i < 10; i++ {
if edsLB := getLatestEdsBalancer(); edsLB != nil {
break
}
time.Sleep(100 * time.Millisecond)
}
if i == 10 {
t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s")
} }
// Call loseContact explicitly, error in EDS callback is not handled. // Call loseContact explicitly, error in EDS callback is not handled.
// Eventually, this should call EDS ballback with an error that indicates // Eventually, this should call EDS ballback with an error that indicates
// "lost contact". // "lost contact".
lb.loseContact() edsB.loseContact()
// verify fallback balancer B takes over // Verify that fallback (fakeBalancerB) takes over.
select { if err := cc.waitForNewSubConns(append(addrs, specialAddrForBalancerB)); err != nil {
case nsc := <-cc.newSubConns: t.Fatal(err)
if !reflect.DeepEqual(append(addrs, specialAddrForBalancerB), nsc) {
t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerB))
}
case <-time.After(5 * time.Second):
t.Fatalf("timeout when getting new subconn result")
} }
cfg3 := cfg edsB.UpdateClientConnState(balancer.ClientConnState{
cfg3.FallBackPolicy = &loadBalancingConfig{Name: fakeBalancerA}
lb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs}, ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: &cfg3, BalancerConfig: &XDSConfig{
BalancerName: bogusBalancerName,
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
},
}) })
// verify fallback balancer A takes over // Verify that fallbackLB (fakeBalancerA) takes over.
select { if err := cc.waitForNewSubConns(append(addrs, specialAddrForBalancerA)); err != nil {
case nsc := <-cc.newSubConns: t.Fatal(err)
if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) {
t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA))
}
case <-time.After(2 * time.Second):
t.Fatalf("timeout when getting new subconn result")
} }
} }
func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) { // TestXDSSubConnStateChange verifies if the top-level edsBalancer passes on
originalNewEDSBalancer := newEDSBalancer // the subConnStateChange to appropriate child balancers (it tests for edsLB
newEDSBalancer = newFakeEDSBalancer // and a fallbackLB).
defer func() { func (s) TestXDSSubConnStateChange(t *testing.T) {
newEDSBalancer = originalNewEDSBalancer edsLBCh := testutils.NewChannel()
}() xdsClientCh := testutils.NewChannel()
cancel := setup(edsLBCh, xdsClientCh)
testXDSClient := newFakeXDSClient() defer cancel()
originalxdsclientNew := xdsclientNew
xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) {
return testXDSClient, nil
}
defer func() {
xdsclientNew = originalxdsclientNew
}()
builder := balancer.Get(edsName) builder := balancer.Get(edsName)
cc := newTestClientConn() cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
if !ok { if !ok {
t.Fatalf("unable to type assert to *edsBalancer") t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
} }
defer lb.Close() defer edsB.Close()
cfg := &XDSConfig{ addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
BalancerName: "wrong-balancer-name", edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: &XDSConfig{
BalancerName: testBalancerNameFooBar,
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
EDSServiceName: testEDSClusterName, EDSServiceName: testEDSClusterName,
} },
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg}) })
// Callback with an EDS update, the balancer will build a EDS balancer, not xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
// a fallback. xdsC.InvokeWatchEDSCallback(&xdsclient.EDSUpdate{}, nil)
testXDSClient.callEDSCallback(xdsclient.ParseEDSRespProtoForTesting(testClusterLoadAssignment), nil) edsLB := waitForNewEDSLB(t, edsLBCh)
expectedScStateChange := &scStateChange{ fsc := &fakeSubConn{}
sc: &fakeSubConn{}, state := connectivity.Ready
state: connectivity.Ready, edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state})
} edsLB.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state})
var i int // lbABuilder maintains a pointer to the last balancerA that it created. We
for i = 0; i < 10; i++ { // need to clear that to make sure a new one is created when we attempt to
if edsLB := getLatestEdsBalancer(); edsLB != nil { // fallback in the next line.
lb.UpdateSubConnState(expectedScStateChange.sc, balancer.SubConnState{ConnectivityState: expectedScStateChange.state})
select {
case scsc := <-edsLB.subconnStateChange:
if !reflect.DeepEqual(scsc, expectedScStateChange) {
t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
}
case <-time.After(time.Second):
t.Fatal("haven't got subconn state change after 1s")
}
break
}
time.Sleep(100 * time.Millisecond)
}
if i == 10 {
t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s")
}
// lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record
// to make sure there's a new one created and get the pointer to it.
lbABuilder.clearLastBalancer() lbABuilder.clearLastBalancer()
// Call loseContact explicitly, error in EDS callback is not handled. // Call loseContact explicitly, error in EDS callback is not handled.
// Eventually, this should call EDS ballback with an error that indicates // Eventually, this should call EDS ballback with an error that indicates
// "lost contact". // "lost contact".
lb.loseContact() edsB.loseContact()
// Verify that fallback (fakeBalancerA) takes over.
// switch to fallback if err := cc.waitForNewSubConns(append(addrs, specialAddrForBalancerA)); err != nil {
// fallback balancer A takes over t.Fatal(err)
for i = 0; i < 10; i++ {
if fblb := lbABuilder.getLastBalancer(); fblb != nil {
lb.UpdateSubConnState(expectedScStateChange.sc, balancer.SubConnState{ConnectivityState: expectedScStateChange.state})
select {
case scsc := <-fblb.subconnStateChange:
if !reflect.DeepEqual(scsc, expectedScStateChange) {
t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
} }
case <-time.After(time.Second): fblb := lbABuilder.getLastBalancer()
t.Fatal("haven't got subconn state change after 1s") if fblb == nil {
} t.Fatal("expected fallback balancerA to be built on fallback")
break
}
time.Sleep(100 * time.Millisecond)
}
if i == 10 {
t.Fatal("balancerA instance has not been created after 1s")
}
}
func (s) TestXdsBalancerFallBackSignalFromEdsBalancer(t *testing.T) {
originalNewEDSBalancer := newEDSBalancer
newEDSBalancer = newFakeEDSBalancer
defer func() {
newEDSBalancer = originalNewEDSBalancer
}()
testXDSClient := newFakeXDSClient()
originalxdsclientNew := xdsclientNew
xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) {
return testXDSClient, nil
}
defer func() {
xdsclientNew = originalxdsclientNew
}()
builder := balancer.Get(edsName)
cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
if !ok {
t.Fatalf("unable to type assert to *edsBalancer")
}
defer lb.Close()
cfg := &XDSConfig{
BalancerName: "wrong-balancer-name",
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
}
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg})
// Callback with an EDS update, the balancer will build a EDS balancer, not
// a fallback.
testXDSClient.callEDSCallback(xdsclient.ParseEDSRespProtoForTesting(testClusterLoadAssignment), nil)
expectedScStateChange := &scStateChange{
sc: &fakeSubConn{},
state: connectivity.Ready,
}
var i int
for i = 0; i < 10; i++ {
if edsLB := getLatestEdsBalancer(); edsLB != nil {
lb.UpdateSubConnState(expectedScStateChange.sc, balancer.SubConnState{ConnectivityState: expectedScStateChange.state})
select {
case scsc := <-edsLB.subconnStateChange:
if !reflect.DeepEqual(scsc, expectedScStateChange) {
t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
}
case <-time.After(time.Second):
t.Fatal("haven't got subconn state change after 1s")
}
break
}
time.Sleep(100 * time.Millisecond)
}
if i == 10 {
t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s")
}
// lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record
// to make sure there's a new one created and get the pointer to it.
lbABuilder.clearLastBalancer()
// Call loseContact explicitly, error in EDS callback is not handled.
// Eventually, this should call EDS ballback with an error that indicates
// "lost contact".
lb.loseContact()
// switch to fallback
// fallback balancer A takes over
for i = 0; i < 10; i++ {
if fblb := lbABuilder.getLastBalancer(); fblb != nil {
lb.UpdateSubConnState(expectedScStateChange.sc, balancer.SubConnState{ConnectivityState: expectedScStateChange.state})
select {
case scsc := <-fblb.subconnStateChange:
if !reflect.DeepEqual(scsc, expectedScStateChange) {
t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
}
case <-time.After(time.Second):
t.Fatal("haven't got subconn state change after 1s")
}
break
}
time.Sleep(100 * time.Millisecond)
}
if i == 10 {
t.Fatal("balancerA instance has not been created after 1s")
} }
edsB.UpdateSubConnState(fsc, balancer.SubConnState{ConnectivityState: state})
fblb.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state})
} }
func TestXdsBalancerConfigParsing(t *testing.T) { func TestXdsBalancerConfigParsing(t *testing.T) {

View File

@ -66,5 +66,10 @@ func (cwt *Channel) Receive() (interface{}, error) {
// NewChannel returns a new Channel. // NewChannel returns a new Channel.
func NewChannel() *Channel { func NewChannel() *Channel {
return &Channel{ch: make(chan interface{}, DefaultChanBufferSize)} return NewChannelWithSize(DefaultChanBufferSize)
}
// NewChannelWithSize returns a new Channel with a buffer of bufSize.
func NewChannelWithSize(bufSize int) *Channel {
return &Channel{ch: make(chan interface{}, bufSize)}
} }

View File

@ -21,6 +21,8 @@
package fakexds package fakexds
import ( import (
"sync"
"google.golang.org/grpc/xds/internal/balancer/lrs" "google.golang.org/grpc/xds/internal/balancer/lrs"
xdsclient "google.golang.org/grpc/xds/internal/client" xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/testutils"
@ -29,18 +31,24 @@ import (
// Client is a fake implementation of an xds client. It exposes a bunch of // Client is a fake implementation of an xds client. It exposes a bunch of
// channels to signal the occurrence of various events. // channels to signal the occurrence of various events.
type Client struct { type Client struct {
serviceCb func(xdsclient.ServiceUpdate, error) name string
edsCb func(*xdsclient.EDSUpdate, error)
suWatchCh *testutils.Channel suWatchCh *testutils.Channel
edsWatchCh *testutils.Channel edsWatchCh *testutils.Channel
suCancelCh *testutils.Channel suCancelCh *testutils.Channel
edsCancelCh *testutils.Channel edsCancelCh *testutils.Channel
loadReportCh *testutils.Channel loadReportCh *testutils.Channel
closeCh *testutils.Channel closeCh *testutils.Channel
mu sync.Mutex
serviceCb func(xdsclient.ServiceUpdate, error)
edsCb func(*xdsclient.EDSUpdate, error)
} }
// WatchService registers a LDS/RDS watch. // WatchService registers a LDS/RDS watch.
func (xdsC *Client) WatchService(target string, callback func(xdsclient.ServiceUpdate, error)) func() { func (xdsC *Client) WatchService(target string, callback func(xdsclient.ServiceUpdate, error)) func() {
xdsC.mu.Lock()
defer xdsC.mu.Unlock()
xdsC.serviceCb = callback xdsC.serviceCb = callback
xdsC.suWatchCh.Send(target) xdsC.suWatchCh.Send(target)
return func() { return func() {
@ -57,11 +65,17 @@ func (xdsC *Client) WaitForWatchService() (string, error) {
// InvokeWatchServiceCallback invokes the registered service watch callback. // InvokeWatchServiceCallback invokes the registered service watch callback.
func (xdsC *Client) InvokeWatchServiceCallback(cluster string, err error) { func (xdsC *Client) InvokeWatchServiceCallback(cluster string, err error) {
xdsC.mu.Lock()
defer xdsC.mu.Unlock()
xdsC.serviceCb(xdsclient.ServiceUpdate{Cluster: cluster}, err) xdsC.serviceCb(xdsclient.ServiceUpdate{Cluster: cluster}, err)
} }
// WatchEDS registers an EDS watch for provided clusterName. // WatchEDS registers an EDS watch for provided clusterName.
func (xdsC *Client) WatchEDS(clusterName string, callback func(*xdsclient.EDSUpdate, error)) (cancel func()) { func (xdsC *Client) WatchEDS(clusterName string, callback func(*xdsclient.EDSUpdate, error)) (cancel func()) {
xdsC.mu.Lock()
defer xdsC.mu.Unlock()
xdsC.edsCb = callback xdsC.edsCb = callback
xdsC.edsWatchCh.Send(clusterName) xdsC.edsWatchCh.Send(clusterName)
return func() { return func() {
@ -78,6 +92,9 @@ func (xdsC *Client) WaitForWatchEDS() (string, error) {
// InvokeWatchEDSCallback invokes the registered edsWatch callback. // InvokeWatchEDSCallback invokes the registered edsWatch callback.
func (xdsC *Client) InvokeWatchEDSCallback(update *xdsclient.EDSUpdate, err error) { func (xdsC *Client) InvokeWatchEDSCallback(update *xdsclient.EDSUpdate, err error) {
xdsC.mu.Lock()
defer xdsC.mu.Unlock()
xdsC.edsCb(update, err) xdsC.edsCb(update, err)
} }
@ -107,9 +124,22 @@ func (xdsC *Client) Close() {
xdsC.closeCh.Send(nil) xdsC.closeCh.Send(nil)
} }
// Name returns the name of the xds client.
func (xdsC *Client) Name() string {
return xdsC.name
}
// NewClient returns a new fake xds client. // NewClient returns a new fake xds client.
func NewClient() *Client { func NewClient() *Client {
return NewClientWithName("")
}
// NewClientWithName returns a new fake xds client with the provided name. This
// is used in cases where multiple clients are created in the tests and we need
// to make sure the client is created for the expected balancer name.
func NewClientWithName(name string) *Client {
return &Client{ return &Client{
name: name,
suWatchCh: testutils.NewChannel(), suWatchCh: testutils.NewChannel(),
edsWatchCh: testutils.NewChannel(), edsWatchCh: testutils.NewChannel(),
suCancelCh: testutils.NewChannel(), suCancelCh: testutils.NewChannel(),