From da3b1eb45ebefef5875d44383098283abbb61a47 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 5 Dec 2019 09:52:43 -0800 Subject: [PATCH] xds: Implementation of the CDS LB policy. (#3224) --- .../balancer/cdsbalancer/cdsbalancer.go | 350 +++++++++++ .../balancer/cdsbalancer/cdsbalancer_test.go | 556 ++++++++++++++++++ xds/internal/balancer/xds.go | 5 +- 3 files changed, 910 insertions(+), 1 deletion(-) create mode 100644 xds/internal/balancer/cdsbalancer/cdsbalancer.go create mode 100644 xds/internal/balancer/cdsbalancer/cdsbalancer_test.go diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go new file mode 100644 index 00000000..bc05345d --- /dev/null +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -0,0 +1,350 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package cdsbalancer implements a balancer to handle CDS responses. +package cdsbalancer + +import ( + "encoding/json" + "errors" + "fmt" + "sync" + + "google.golang.org/grpc/attributes" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal/buffer" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" + + xdsinternal "google.golang.org/grpc/xds/internal" + xdsbalancer "google.golang.org/grpc/xds/internal/balancer" + xdsclient "google.golang.org/grpc/xds/internal/client" +) + +const ( + cdsName = "experimental_cds" + edsName = "experimental_eds" +) + +var ( + errBalancerClosed = errors.New("cdsBalancer is closed") + + // newEDSBalancer is a helper function to build a new edsBalancer and will be + // overridden in unittests. + newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.V2Balancer { + builder := balancer.Get(edsName) + if builder == nil { + grpclog.Errorf("xds: no balancer builder with name %v", edsName) + return nil + } + // We directly pass the parent clientConn to the + // underlying edsBalancer because the cdsBalancer does + // not deal with subConns. + return builder.Build(cc, opts).(balancer.V2Balancer) + } +) + +func init() { + balancer.Register(cdsBB{}) +} + +// cdsBB (short for cdsBalancerBuilder) implements the balancer.Builder +// interface to help build a cdsBalancer. +// It also implements the balancer.ConfigParser interface to help parse the +// JSON service config, to be passed to the cdsBalancer. +type cdsBB struct{} + +// Build creates a new CDS balancer with the ClientConn. +func (cdsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + b := &cdsBalancer{ + cc: cc, + bOpts: opts, + updateCh: buffer.NewUnbounded(), + } + go b.run() + return b +} + +// Name returns the name of balancers built by this builder. +func (cdsBB) Name() string { + return cdsName +} + +// lbConfig represents the loadBalancingConfig section of the service config +// for the cdsBalancer. +type lbConfig struct { + serviceconfig.LoadBalancingConfig + ClusterName string `json:"Cluster"` +} + +// ParseConfig parses the JSON load balancer config provided into an +// internal form or returns an error if the config is invalid. +func (cdsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + var cfg lbConfig + if err := json.Unmarshal(c, &cfg); err != nil { + return nil, fmt.Errorf("xds: unable to unmarshal lbconfig: %s, error: %v", string(c), err) + } + return &cfg, nil +} + +// xdsClientInterface contains methods from xdsClient.Client which are used by +// the cdsBalancer. This will be faked out in unittests. +type xdsClientInterface interface { + WatchCluster(string, func(xdsclient.CDSUpdate, error)) func() + Close() +} + +// ccUpdate wraps a clientConn update received from gRPC (pushed from the +// xdsResolver). A valid clusterName causes the cdsBalancer to register a CDS +// watcher with the xdsClient, while a non-nil error causes it to cancel the +// existing watch and propagate the error to the underlying edsBalancer. +type ccUpdate struct { + client xdsClientInterface + clusterName string + err error +} + +// scUpdate wraps a subConn update received from gRPC. This is directly passed +// on to the edsBalancer. +type scUpdate struct { + subConn balancer.SubConn + state balancer.SubConnState +} + +// watchUpdate wraps the information received from a registered CDS watcher. A +// non-nil error is propagated to the underlying edsBalancer. A valid update +// results in creating a new edsBalancer (if one doesn't already exist) and +// pushing the update to it. +type watchUpdate struct { + cds xdsclient.CDSUpdate + err error +} + +// closeUpdate is an empty struct used to notify the run() goroutine that a +// Close has been called on the balancer. +type closeUpdate struct{} + +// cdsBalancer implements a CDS based LB policy. It instantiates an EDS based +// LB policy to further resolve the serviceName received from CDS, into +// localities and endpoints. Implements the balancer.Balancer interface which +// is exposed to gRPC and implements the balancer.ClientConn interface which is +// exposed to the edsBalancer. +type cdsBalancer struct { + cc balancer.ClientConn + bOpts balancer.BuildOptions + updateCh *buffer.Unbounded + client xdsClientInterface + cancelWatch func() + edsLB balancer.V2Balancer + clusterToWatch string + + // The only thing protected by this mutex is the closed boolean. This is + // checked by all methods before acting on updates. + mu sync.Mutex + closed bool +} + +// run is a long-running goroutine which handles all updates from gRPC. All +// methods which are invoked directly by gRPC or xdsClient simply push an +// update onto a channel which is read and acted upon right here. +// +// 1. Good clientConn updates lead to registration of a CDS watch. Updates with +// error lead to cancellation of existing watch and propagation of the same +// error to the edsBalancer. +// 2. SubConn updates are passthrough and are simply handed over to the +// underlying edsBalancer. +// 3. Watch API updates lead to clientConn updates being invoked on the +// underlying edsBalancer. +// 4. Close results in cancellation of the CDS watch and closing of the +// underlying edsBalancer and is the only way to exit this goroutine. +func (b *cdsBalancer) run() { + for { + u := <-b.updateCh.Get() + b.updateCh.Load() + switch update := u.(type) { + case *ccUpdate: + // We first handle errors, if any, and then proceed with handling + // the update, only if the status quo has changed. + if err := update.err; err != nil { + // TODO: Should we cancel the watch only on specific errors? + if b.cancelWatch != nil { + b.cancelWatch() + } + if b.edsLB != nil { + b.edsLB.ResolverError(err) + } + } + if b.client == update.client && b.clusterToWatch == update.clusterName { + break + } + if update.client != nil { + // Since the cdsBalancer doesn't own the xdsClient object, we + // don't have to bother about closing the old client here, but + // we still need to cancel the watch on the old client. + if b.cancelWatch != nil { + b.cancelWatch() + } + b.client = update.client + } + if update.clusterName != "" { + b.cancelWatch = b.client.WatchCluster(update.clusterName, b.handleClusterUpdate) + b.clusterToWatch = update.clusterName + } + case *scUpdate: + if b.edsLB == nil { + grpclog.Errorf("xds: received scUpdate {%+v} with no edsBalancer", update) + break + } + b.edsLB.UpdateSubConnState(update.subConn, update.state) + case *watchUpdate: + if err := update.err; err != nil { + if b.edsLB != nil { + b.edsLB.ResolverError(err) + } + break + } + + // The first good update from the watch API leads to the + // instantiation of an edsBalancer. Further updates/errors are + // propagated to the existing edsBalancer. + if b.edsLB == nil { + b.edsLB = newEDSBalancer(b.cc, b.bOpts) + if b.edsLB == nil { + grpclog.Error("xds: failed to build edsBalancer") + break + } + } + lbCfg := &xdsbalancer.XDSConfig{EDSServiceName: update.cds.ServiceName} + if update.cds.EnableLRS { + // An empty string here indicates that the edsBalancer + // should use the same xDS server for load reporting as + // it does for EDS requests/responses. + lbCfg.LrsLoadReportingServerName = new(string) + + } + ccState := balancer.ClientConnState{ + ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, b.client)}, + BalancerConfig: lbCfg, + } + if err := b.edsLB.UpdateClientConnState(ccState); err != nil { + grpclog.Errorf("xds: edsBalancer.UpdateClientConnState(%+v) returned error: %v", ccState, err) + } + case *closeUpdate: + if b.cancelWatch != nil { + b.cancelWatch() + b.cancelWatch = nil + } + if b.edsLB != nil { + b.edsLB.Close() + b.edsLB = nil + } + // This is the *ONLY* point of return from this function. + return + } + } +} + +// handleClusterUpdate is the CDS watch API callback. It simply pushes the +// received information on to the update channel for run() to pick it up. +func (b *cdsBalancer) handleClusterUpdate(cu xdsclient.CDSUpdate, err error) { + if b.isClosed() { + grpclog.Warningf("xds: received cluster update {%+v} after cdsBalancer was closed", cu) + return + } + b.updateCh.Put(&watchUpdate{cds: cu, err: err}) +} + +// UpdateClientConnState receives the serviceConfig (which contains the +// clusterName to watch for in CDS) and the xdsClient object from the +// xdsResolver. +func (b *cdsBalancer) UpdateClientConnState(state balancer.ClientConnState) error { + if b.isClosed() { + grpclog.Warningf("xds: received ClientConnState {%+v} after cdsBalancer was closed", state) + return errBalancerClosed + } + + // The errors checked here should ideally never happen because the + // ServiceConfig in this case is prepared by the xdsResolver and is not + // something that is received on the wire. + lbCfg, ok := state.BalancerConfig.(*lbConfig) + if !ok { + grpclog.Warningf("xds: unexpected LoadBalancingConfig type: %T", state.BalancerConfig) + return balancer.ErrBadResolverState + } + if lbCfg.ClusterName == "" { + grpclog.Warning("xds: no clusterName found in LoadBalancingConfig: %+v", lbCfg) + return balancer.ErrBadResolverState + } + client := state.ResolverState.Attributes.Value(xdsinternal.XDSClientID) + if client == nil { + grpclog.Warning("xds: no xdsClient found in resolver state attributes") + return balancer.ErrBadResolverState + } + newClient, ok := client.(xdsClientInterface) + if !ok { + grpclog.Warningf("xds: unexpected xdsClient type: %T", client) + return balancer.ErrBadResolverState + } + b.updateCh.Put(&ccUpdate{client: newClient, clusterName: lbCfg.ClusterName}) + return nil +} + +// ResolverError handles errors reported by the xdsResolver. +// +// TODO: Make it possible to differentiate between connection errors and +// resource not found errors. +func (b *cdsBalancer) ResolverError(err error) { + if b.isClosed() { + grpclog.Warningf("xds: received resolver error {%v} after cdsBalancer was closed", err) + return + } + + b.updateCh.Put(&ccUpdate{err: err}) +} + +// UpdateSubConnState handles subConn updates from gRPC. +func (b *cdsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + if b.isClosed() { + grpclog.Warningf("xds: received subConn update {%v, %v} after cdsBalancer was closed", sc, state) + return + } + b.updateCh.Put(&scUpdate{subConn: sc, state: state}) +} + +// Close closes the cdsBalancer and the underlying edsBalancer. +func (b *cdsBalancer) Close() { + b.mu.Lock() + b.closed = true + b.mu.Unlock() + b.updateCh.Put(&closeUpdate{}) +} + +func (b *cdsBalancer) isClosed() bool { + b.mu.Lock() + closed := b.closed + b.mu.Unlock() + return closed +} + +func (b *cdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { + grpclog.Error("UpdateSubConnState should be called instead of HandleSubConnStateChange") +} + +func (b *cdsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { + grpclog.Error("UpdateClientConnState should be called instead of HandleResolvedAddrs") +} diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go new file mode 100644 index 00000000..2d5a8222 --- /dev/null +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -0,0 +1,556 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cdsbalancer + +import ( + "encoding/json" + "errors" + "fmt" + "reflect" + "testing" + "time" + + "google.golang.org/grpc/attributes" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" + xdsinternal "google.golang.org/grpc/xds/internal" + xdsbalancer "google.golang.org/grpc/xds/internal/balancer" + xdsclient "google.golang.org/grpc/xds/internal/client" +) + +const ( + clusterName = "cluster1" + serviceName = "service1" + defaultTestTimeout = 2 * time.Second +) + +type testClientConn struct { + balancer.ClientConn +} + +// testXDSClient is fake implementation of the xdsClientInterface. It contains +// a bunch of channels to signal different events to the test. +type testXDSClient struct { + // watchCb is the watch API callback registered by the cdsBalancer. Used to + // pass different CDS updates to the balancer, from the test. + watchCb func(xdsclient.CDSUpdate, error) + // clusterCh is a channel used to signal the cluster name for which the + // watch API call was invoked on this client. + clusterCh chan string + // cancelWatchCh is a channel used to signal the cancellation of the + // registered watch API. + cancelWatchCh chan struct{} +} + +func newTestXDSClient() *testXDSClient { + return &testXDSClient{ + clusterCh: make(chan string, 1), + cancelWatchCh: make(chan struct{}, 1), + } +} +func (tc *testXDSClient) WatchCluster(clusterName string, callback func(xdsclient.CDSUpdate, error)) func() { + tc.watchCb = callback + tc.clusterCh <- clusterName + return tc.cancelWatch +} + +func (tc *testXDSClient) Close() {} + +func (tc *testXDSClient) cancelWatch() { + tc.cancelWatchCh <- struct{}{} +} + +// waitForWatch verifies if the testXDSClient receives a CDS watch API with the +// provided clusterName within a reasonable amount of time. +func (tc *testXDSClient) waitForWatch(wantCluster string) error { + timer := time.NewTimer(defaultTestTimeout) + select { + case <-timer.C: + return errors.New("Timeout when expecting CDS watch call") + case gotCluster := <-tc.clusterCh: + timer.Stop() + if gotCluster != wantCluster { + return fmt.Errorf("WatchCluster called with clusterName: %s, want %s", gotCluster, wantCluster) + } + return nil + } +} + +// waitForCancelWatch verifies if the CDS watch API is cancelled within a +// reasonable amount of time. +func (tc *testXDSClient) waitForCancelWatch() error { + timer := time.NewTimer(defaultTestTimeout) + select { + case <-timer.C: + return errors.New("Timeout when expecting CDS watch call to be cancelled") + case <-tc.cancelWatchCh: + timer.Stop() + return nil + } +} + +// cdsWatchInfo wraps the update and the error sent in a CDS watch callback. +type cdsWatchInfo struct { + update xdsclient.CDSUpdate + err error +} + +// invokeWatchCb invokes the CDS watch callback registered by the cdsBalancer +// and waits for appropriate state to be pushed to the provided edsBalancer. +func (tc *testXDSClient) invokeWatchCb(cdsW cdsWatchInfo, wantCCS balancer.ClientConnState, edsB *testEDSBalancer) error { + tc.watchCb(cdsW.update, cdsW.err) + if cdsW.err != nil { + return edsB.waitForResolverError(cdsW.err) + } + return edsB.waitForClientConnUpdate(wantCCS) +} + +// testEDSBalancer is a fake edsBalancer used to verify different actions from +// the cdsBalancer. It contains a bunch of channels to signal different events +// to the test. +type testEDSBalancer struct { + // ccsCh is a channel used to signal the receipt of a ClientConn update. + ccsCh chan balancer.ClientConnState + // scStateCh is a channel used to signal the receipt of a SubConn update. + scStateCh chan subConnWithState + // resolverErrCh is a channel used to signal a resolver error. + resolverErrCh chan error + // closeCh is a channel used to signal the closing of this balancer. + closeCh chan struct{} +} + +type subConnWithState struct { + sc balancer.SubConn + state balancer.SubConnState +} + +func newTestEDSBalancer() *testEDSBalancer { + return &testEDSBalancer{ + ccsCh: make(chan balancer.ClientConnState, 1), + scStateCh: make(chan subConnWithState, 1), + resolverErrCh: make(chan error, 1), + closeCh: make(chan struct{}, 1), + } +} + +func (tb *testEDSBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { + tb.ccsCh <- ccs + return nil +} + +func (tb *testEDSBalancer) ResolverError(err error) { + tb.resolverErrCh <- err +} + +func (tb *testEDSBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + tb.scStateCh <- subConnWithState{sc: sc, state: state} +} + +func (tb *testEDSBalancer) Close() { + tb.closeCh <- struct{}{} +} + +// waitForClientConnUpdate verifies if the testEDSBalancer receives the +// provided ClientConnState within a reasonable amount of time. +func (tb *testEDSBalancer) waitForClientConnUpdate(wantCCS balancer.ClientConnState) error { + timer := time.NewTimer(defaultTestTimeout) + select { + case <-timer.C: + return errors.New("Timeout when expecting ClientConn update on EDS balancer") + case gotCCS := <-tb.ccsCh: + timer.Stop() + if !reflect.DeepEqual(gotCCS, wantCCS) { + return fmt.Errorf("received ClientConnState: %+v, want %+v", gotCCS, wantCCS) + } + return nil + } +} + +// waitForSubConnUpdate verifies if the testEDSBalancer receives the provided +// SubConn update within a reasonable amount of time. +func (tb *testEDSBalancer) waitForSubConnUpdate(wantSCS subConnWithState) error { + timer := time.NewTimer(defaultTestTimeout) + select { + case <-timer.C: + return errors.New("Timeout when expecting SubConn update on EDS balancer") + case gotSCS := <-tb.scStateCh: + timer.Stop() + if !reflect.DeepEqual(gotSCS, wantSCS) { + return fmt.Errorf("received SubConnState: %+v, want %+v", gotSCS, wantSCS) + } + return nil + } +} + +// waitForResolverError verifies if the testEDSBalancer receives the +// provided resolver error within a reasonable amount of time. +func (tb *testEDSBalancer) waitForResolverError(wantErr error) error { + timer := time.NewTimer(defaultTestTimeout) + select { + case <-timer.C: + return errors.New("Timeout when expecting a resolver error") + case gotErr := <-tb.resolverErrCh: + timer.Stop() + if gotErr != wantErr { + return fmt.Errorf("received resolver error: %v, want %v", gotErr, wantErr) + } + return nil + } +} + +// waitForClose verifies that the edsBalancer is closed with a reasonable +// amount of time. +func (tb *testEDSBalancer) waitForClose() error { + timer := time.NewTimer(defaultTestTimeout) + select { + case <-timer.C: + return errors.New("Timeout when expecting a close") + case <-tb.closeCh: + timer.Stop() + return nil + } +} + +// cdsCCS is a helper function to construct a good update passed from the +// xdsResolver to the cdsBalancer. +func cdsCCS(cluster string, xdsClient interface{}) balancer.ClientConnState { + const cdsLBConfig = `{ + "loadBalancingConfig":[ + { + "experimental_cds":{ + "Cluster": %s + } + } + ] + }` + jsonSC := fmt.Sprintf(cdsLBConfig, cluster) + return balancer.ClientConnState{ + ResolverState: resolver.State{ + ServiceConfig: internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(jsonSC), + Attributes: attributes.New(xdsinternal.XDSClientID, xdsClient), + }, + BalancerConfig: &lbConfig{ClusterName: clusterName}, + } +} + +// edsCCS is a helper function to construct a good update passed from the +// cdsBalancer to the edsBalancer. +func edsCCS(service string, enableLRS bool, xdsClient interface{}) balancer.ClientConnState { + lbCfg := &xdsbalancer.XDSConfig{EDSServiceName: service} + if enableLRS { + lbCfg.LrsLoadReportingServerName = new(string) + } + return balancer.ClientConnState{ + ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsClient)}, + BalancerConfig: lbCfg, + } +} + +// setup creates a cdsBalancer and an edsBalancer (and overrides the +// newEDSBalancer function to return it), and also returns a cleanup function. +func setup() (*cdsBalancer, *testEDSBalancer, func()) { + builder := cdsBB{} + tcc := &testClientConn{} + cdsB := builder.Build(tcc, balancer.BuildOptions{}).(balancer.V2Balancer) + + edsB := newTestEDSBalancer() + oldEDSBalancerBuilder := newEDSBalancer + newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.V2Balancer { + return edsB + } + + return cdsB.(*cdsBalancer), edsB, func() { + newEDSBalancer = oldEDSBalancerBuilder + } +} + +// setupWithWatch does everything that setup does, and also pushes a ClientConn +// update to the cdsBalancer and waits for a CDS watch call to be registered. +func setupWithWatch(t *testing.T) (*testXDSClient, *cdsBalancer, *testEDSBalancer, func()) { + t.Helper() + + xdsC := newTestXDSClient() + cdsB, edsB, cancel := setup() + if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil { + t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err) + } + if err := xdsC.waitForWatch(clusterName); err != nil { + t.Fatal(err) + } + return xdsC, cdsB, edsB, cancel +} + +// TestUpdateClientConnState invokes the UpdateClientConnState method on the +// cdsBalancer with different inputs and verifies that the CDS watch API on the +// provided xdsClient is invoked appropriately. +func TestUpdateClientConnState(t *testing.T) { + xdsC := newTestXDSClient() + + tests := []struct { + name string + ccs balancer.ClientConnState + wantErr error + wantCluster string + }{ + { + name: "bad-lbCfg-type", + ccs: balancer.ClientConnState{BalancerConfig: nil}, + wantErr: balancer.ErrBadResolverState, + }, + { + name: "empty-cluster-in-lbCfg", + ccs: balancer.ClientConnState{BalancerConfig: &lbConfig{ClusterName: ""}}, + wantErr: balancer.ErrBadResolverState, + }, + { + name: "no-xdsClient-in-attributes", + ccs: balancer.ClientConnState{ + ResolverState: resolver.State{ + Attributes: attributes.New("key", "value"), + }, + BalancerConfig: &lbConfig{ClusterName: clusterName}, + }, + wantErr: balancer.ErrBadResolverState, + }, + { + name: "bad-xdsClient-in-attributes", + ccs: balancer.ClientConnState{ + ResolverState: resolver.State{ + Attributes: attributes.New(xdsinternal.XDSClientID, "value"), + }, + BalancerConfig: &lbConfig{ClusterName: clusterName}, + }, + wantErr: balancer.ErrBadResolverState, + }, + { + name: "happy-good-case", + ccs: cdsCCS(clusterName, xdsC), + wantCluster: clusterName, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + cdsB, _, cancel := setup() + defer func() { + cancel() + cdsB.Close() + }() + + if err := cdsB.UpdateClientConnState(test.ccs); err != test.wantErr { + t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err) + } + if test.wantErr != nil { + // When we wanted an error and got it, we should return early. + return + } + if err := xdsC.waitForWatch(test.wantCluster); err != nil { + t.Fatal(err) + } + }) + } +} + +// TestUpdateClientConnStateAfterClose invokes the UpdateClientConnState method +// on the cdsBalancer after close and verifies that it returns an error. +func TestUpdateClientConnStateAfterClose(t *testing.T) { + cdsB, _, cancel := setup() + defer cancel() + cdsB.Close() + + if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, newTestXDSClient())); err != errBalancerClosed { + t.Fatalf("UpdateClientConnState() after close returned %v, want %v", err, errBalancerClosed) + } +} + +// TestUpdateClientConnStateWithSameState verifies that a ClientConnState +// update with the same cluster and xdsClient does not cause the cdsBalancer to +// create a new watch. +func TestUpdateClientConnStateWithSameState(t *testing.T) { + xdsC, cdsB, _, cancel := setupWithWatch(t) + defer func() { + cancel() + cdsB.Close() + }() + + if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil { + t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err) + } + if err := xdsC.waitForWatch(clusterName); err == nil { + t.Fatal("Waiting for WatchCluster() should have timed out, but returned with nil error") + } +} + +// TestHandleClusterUpdate invokes the registered CDS watch callback with +// different updates and verifies that the expect ClientConnState is propagated +// to the edsBalancer. +func TestHandleClusterUpdate(t *testing.T) { + xdsC, cdsB, edsB, cancel := setupWithWatch(t) + defer func() { + cancel() + cdsB.Close() + }() + + tests := []struct { + name string + cdsUpdate xdsclient.CDSUpdate + updateErr error + wantCCS balancer.ClientConnState + }{ + { + name: "happy-case-with-lrs", + cdsUpdate: xdsclient.CDSUpdate{ServiceName: serviceName, EnableLRS: true}, + wantCCS: edsCCS(serviceName, true, xdsC), + }, + { + name: "happy-case-without-lrs", + cdsUpdate: xdsclient.CDSUpdate{ServiceName: serviceName}, + wantCCS: edsCCS(serviceName, false, xdsC), + }, + { + name: "cdsWatch-returns-error", + updateErr: errors.New("cdsUpdate error"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if err := xdsC.invokeWatchCb(cdsWatchInfo{test.cdsUpdate, test.updateErr}, test.wantCCS, edsB); err != nil { + t.Fatal(err) + } + }) + } +} + +// TestResolverError verifies that an existing watch is cancelled when a +// resolver error is received by the cdsBalancer, and also that the same error +// is propagated to the edsBalancer. +func TestResolverError(t *testing.T) { + xdsC, cdsB, edsB, cancel := setupWithWatch(t) + defer func() { + cancel() + cdsB.Close() + }() + + cdsUpdate := xdsclient.CDSUpdate{ServiceName: serviceName} + wantCCS := edsCCS(serviceName, false, xdsC) + if err := xdsC.invokeWatchCb(cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { + t.Fatal(err) + } + + rErr := errors.New("cdsBalancer resolver error") + cdsB.ResolverError(rErr) + if err := xdsC.waitForCancelWatch(); err != nil { + t.Fatal(err) + } + if err := edsB.waitForResolverError(rErr); err != nil { + t.Fatal(err) + } +} + +// TestUpdateSubConnState pushes a SubConn update to the cdsBalancer and +// verifies that the update is propagated to the edsBalancer. +func TestUpdateSubConnState(t *testing.T) { + xdsC, cdsB, edsB, cancel := setupWithWatch(t) + defer func() { + cancel() + cdsB.Close() + }() + + cdsUpdate := xdsclient.CDSUpdate{ServiceName: serviceName} + wantCCS := edsCCS(serviceName, false, xdsC) + if err := xdsC.invokeWatchCb(cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { + t.Fatal(err) + } + + var sc balancer.SubConn + state := balancer.SubConnState{ConnectivityState: connectivity.Ready} + cdsB.UpdateSubConnState(sc, state) + if err := edsB.waitForSubConnUpdate(subConnWithState{sc: sc, state: state}); err != nil { + t.Fatal(err) + } +} + +// TestClose calls Close() on the cdsBalancer, and verifies that the underlying +// edsBalancer is also closed. +func TestClose(t *testing.T) { + xdsC, cdsB, edsB, cancel := setupWithWatch(t) + defer cancel() + + cdsUpdate := xdsclient.CDSUpdate{ServiceName: serviceName} + wantCCS := edsCCS(serviceName, false, xdsC) + if err := xdsC.invokeWatchCb(cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { + t.Fatal(err) + } + + cdsB.Close() + if err := xdsC.waitForCancelWatch(); err != nil { + t.Fatal(err) + } + if err := edsB.waitForClose(); err != nil { + t.Fatal(err) + } +} + +// TestParseConfig exercises the config parsing functionality in the cds +// balancer builder. +func TestParseConfig(t *testing.T) { + bb := cdsBB{} + if gotName := bb.Name(); gotName != cdsName { + t.Fatalf("cdsBB.Name() = %v, want %v", gotName, cdsName) + } + + tests := []struct { + name string + input json.RawMessage + wantCfg serviceconfig.LoadBalancingConfig + wantErr bool + }{ + { + name: "good-lb-config", + input: json.RawMessage(`{"Cluster": "cluster1"}`), + wantCfg: &lbConfig{ClusterName: clusterName}, + }, + { + name: "unknown-fields-in-lb-config", + input: json.RawMessage(`{"Unknown": "foobar"}`), + wantCfg: &lbConfig{ClusterName: ""}, + }, + { + name: "empty-lb-config", + input: json.RawMessage(""), + wantErr: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + gotCfg, gotErr := bb.ParseConfig(test.input) + if (gotErr != nil) != test.wantErr { + t.Fatalf("bb.ParseConfig(%v) = %v, wantErr %v", string(test.input), gotErr, test.wantErr) + } + if !test.wantErr { + if !reflect.DeepEqual(gotCfg, test.wantCfg) { + t.Fatalf("bb.ParseConfig(%v) = %v, want %v", string(test.input), gotCfg, test.wantCfg) + } + } + }) + } +} diff --git a/xds/internal/balancer/xds.go b/xds/internal/balancer/xds.go index b2685400..8041dfd6 100644 --- a/xds/internal/balancer/xds.go +++ b/xds/internal/balancer/xds.go @@ -330,7 +330,10 @@ func (x *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Sub } func (x *edsBalancer) ResolverError(error) { - // Ignore for now + // TODO: Need to distinguish between connection errors and resource removed + // errors. For the former, we will need to handle it later on for fallback. + // For the latter, handle it by stopping the watch, closing sub-balancers + // and pickers. } func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {