xds: Implementation of the CDS LB policy. (#3224)
This commit is contained in:

committed by
GitHub

parent
6f8ce09297
commit
da3b1eb45e
350
xds/internal/balancer/cdsbalancer/cdsbalancer.go
Normal file
350
xds/internal/balancer/cdsbalancer/cdsbalancer.go
Normal file
@ -0,0 +1,350 @@
|
||||
/*
|
||||
* Copyright 2019 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
// Package cdsbalancer implements a balancer to handle CDS responses.
|
||||
package cdsbalancer
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"google.golang.org/grpc/attributes"
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal/buffer"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
|
||||
xdsinternal "google.golang.org/grpc/xds/internal"
|
||||
xdsbalancer "google.golang.org/grpc/xds/internal/balancer"
|
||||
xdsclient "google.golang.org/grpc/xds/internal/client"
|
||||
)
|
||||
|
||||
const (
|
||||
cdsName = "experimental_cds"
|
||||
edsName = "experimental_eds"
|
||||
)
|
||||
|
||||
var (
|
||||
errBalancerClosed = errors.New("cdsBalancer is closed")
|
||||
|
||||
// newEDSBalancer is a helper function to build a new edsBalancer and will be
|
||||
// overridden in unittests.
|
||||
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.V2Balancer {
|
||||
builder := balancer.Get(edsName)
|
||||
if builder == nil {
|
||||
grpclog.Errorf("xds: no balancer builder with name %v", edsName)
|
||||
return nil
|
||||
}
|
||||
// We directly pass the parent clientConn to the
|
||||
// underlying edsBalancer because the cdsBalancer does
|
||||
// not deal with subConns.
|
||||
return builder.Build(cc, opts).(balancer.V2Balancer)
|
||||
}
|
||||
)
|
||||
|
||||
func init() {
|
||||
balancer.Register(cdsBB{})
|
||||
}
|
||||
|
||||
// cdsBB (short for cdsBalancerBuilder) implements the balancer.Builder
|
||||
// interface to help build a cdsBalancer.
|
||||
// It also implements the balancer.ConfigParser interface to help parse the
|
||||
// JSON service config, to be passed to the cdsBalancer.
|
||||
type cdsBB struct{}
|
||||
|
||||
// Build creates a new CDS balancer with the ClientConn.
|
||||
func (cdsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
|
||||
b := &cdsBalancer{
|
||||
cc: cc,
|
||||
bOpts: opts,
|
||||
updateCh: buffer.NewUnbounded(),
|
||||
}
|
||||
go b.run()
|
||||
return b
|
||||
}
|
||||
|
||||
// Name returns the name of balancers built by this builder.
|
||||
func (cdsBB) Name() string {
|
||||
return cdsName
|
||||
}
|
||||
|
||||
// lbConfig represents the loadBalancingConfig section of the service config
|
||||
// for the cdsBalancer.
|
||||
type lbConfig struct {
|
||||
serviceconfig.LoadBalancingConfig
|
||||
ClusterName string `json:"Cluster"`
|
||||
}
|
||||
|
||||
// ParseConfig parses the JSON load balancer config provided into an
|
||||
// internal form or returns an error if the config is invalid.
|
||||
func (cdsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
|
||||
var cfg lbConfig
|
||||
if err := json.Unmarshal(c, &cfg); err != nil {
|
||||
return nil, fmt.Errorf("xds: unable to unmarshal lbconfig: %s, error: %v", string(c), err)
|
||||
}
|
||||
return &cfg, nil
|
||||
}
|
||||
|
||||
// xdsClientInterface contains methods from xdsClient.Client which are used by
|
||||
// the cdsBalancer. This will be faked out in unittests.
|
||||
type xdsClientInterface interface {
|
||||
WatchCluster(string, func(xdsclient.CDSUpdate, error)) func()
|
||||
Close()
|
||||
}
|
||||
|
||||
// ccUpdate wraps a clientConn update received from gRPC (pushed from the
|
||||
// xdsResolver). A valid clusterName causes the cdsBalancer to register a CDS
|
||||
// watcher with the xdsClient, while a non-nil error causes it to cancel the
|
||||
// existing watch and propagate the error to the underlying edsBalancer.
|
||||
type ccUpdate struct {
|
||||
client xdsClientInterface
|
||||
clusterName string
|
||||
err error
|
||||
}
|
||||
|
||||
// scUpdate wraps a subConn update received from gRPC. This is directly passed
|
||||
// on to the edsBalancer.
|
||||
type scUpdate struct {
|
||||
subConn balancer.SubConn
|
||||
state balancer.SubConnState
|
||||
}
|
||||
|
||||
// watchUpdate wraps the information received from a registered CDS watcher. A
|
||||
// non-nil error is propagated to the underlying edsBalancer. A valid update
|
||||
// results in creating a new edsBalancer (if one doesn't already exist) and
|
||||
// pushing the update to it.
|
||||
type watchUpdate struct {
|
||||
cds xdsclient.CDSUpdate
|
||||
err error
|
||||
}
|
||||
|
||||
// closeUpdate is an empty struct used to notify the run() goroutine that a
|
||||
// Close has been called on the balancer.
|
||||
type closeUpdate struct{}
|
||||
|
||||
// cdsBalancer implements a CDS based LB policy. It instantiates an EDS based
|
||||
// LB policy to further resolve the serviceName received from CDS, into
|
||||
// localities and endpoints. Implements the balancer.Balancer interface which
|
||||
// is exposed to gRPC and implements the balancer.ClientConn interface which is
|
||||
// exposed to the edsBalancer.
|
||||
type cdsBalancer struct {
|
||||
cc balancer.ClientConn
|
||||
bOpts balancer.BuildOptions
|
||||
updateCh *buffer.Unbounded
|
||||
client xdsClientInterface
|
||||
cancelWatch func()
|
||||
edsLB balancer.V2Balancer
|
||||
clusterToWatch string
|
||||
|
||||
// The only thing protected by this mutex is the closed boolean. This is
|
||||
// checked by all methods before acting on updates.
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
}
|
||||
|
||||
// run is a long-running goroutine which handles all updates from gRPC. All
|
||||
// methods which are invoked directly by gRPC or xdsClient simply push an
|
||||
// update onto a channel which is read and acted upon right here.
|
||||
//
|
||||
// 1. Good clientConn updates lead to registration of a CDS watch. Updates with
|
||||
// error lead to cancellation of existing watch and propagation of the same
|
||||
// error to the edsBalancer.
|
||||
// 2. SubConn updates are passthrough and are simply handed over to the
|
||||
// underlying edsBalancer.
|
||||
// 3. Watch API updates lead to clientConn updates being invoked on the
|
||||
// underlying edsBalancer.
|
||||
// 4. Close results in cancellation of the CDS watch and closing of the
|
||||
// underlying edsBalancer and is the only way to exit this goroutine.
|
||||
func (b *cdsBalancer) run() {
|
||||
for {
|
||||
u := <-b.updateCh.Get()
|
||||
b.updateCh.Load()
|
||||
switch update := u.(type) {
|
||||
case *ccUpdate:
|
||||
// We first handle errors, if any, and then proceed with handling
|
||||
// the update, only if the status quo has changed.
|
||||
if err := update.err; err != nil {
|
||||
// TODO: Should we cancel the watch only on specific errors?
|
||||
if b.cancelWatch != nil {
|
||||
b.cancelWatch()
|
||||
}
|
||||
if b.edsLB != nil {
|
||||
b.edsLB.ResolverError(err)
|
||||
}
|
||||
}
|
||||
if b.client == update.client && b.clusterToWatch == update.clusterName {
|
||||
break
|
||||
}
|
||||
if update.client != nil {
|
||||
// Since the cdsBalancer doesn't own the xdsClient object, we
|
||||
// don't have to bother about closing the old client here, but
|
||||
// we still need to cancel the watch on the old client.
|
||||
if b.cancelWatch != nil {
|
||||
b.cancelWatch()
|
||||
}
|
||||
b.client = update.client
|
||||
}
|
||||
if update.clusterName != "" {
|
||||
b.cancelWatch = b.client.WatchCluster(update.clusterName, b.handleClusterUpdate)
|
||||
b.clusterToWatch = update.clusterName
|
||||
}
|
||||
case *scUpdate:
|
||||
if b.edsLB == nil {
|
||||
grpclog.Errorf("xds: received scUpdate {%+v} with no edsBalancer", update)
|
||||
break
|
||||
}
|
||||
b.edsLB.UpdateSubConnState(update.subConn, update.state)
|
||||
case *watchUpdate:
|
||||
if err := update.err; err != nil {
|
||||
if b.edsLB != nil {
|
||||
b.edsLB.ResolverError(err)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
// The first good update from the watch API leads to the
|
||||
// instantiation of an edsBalancer. Further updates/errors are
|
||||
// propagated to the existing edsBalancer.
|
||||
if b.edsLB == nil {
|
||||
b.edsLB = newEDSBalancer(b.cc, b.bOpts)
|
||||
if b.edsLB == nil {
|
||||
grpclog.Error("xds: failed to build edsBalancer")
|
||||
break
|
||||
}
|
||||
}
|
||||
lbCfg := &xdsbalancer.XDSConfig{EDSServiceName: update.cds.ServiceName}
|
||||
if update.cds.EnableLRS {
|
||||
// An empty string here indicates that the edsBalancer
|
||||
// should use the same xDS server for load reporting as
|
||||
// it does for EDS requests/responses.
|
||||
lbCfg.LrsLoadReportingServerName = new(string)
|
||||
|
||||
}
|
||||
ccState := balancer.ClientConnState{
|
||||
ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, b.client)},
|
||||
BalancerConfig: lbCfg,
|
||||
}
|
||||
if err := b.edsLB.UpdateClientConnState(ccState); err != nil {
|
||||
grpclog.Errorf("xds: edsBalancer.UpdateClientConnState(%+v) returned error: %v", ccState, err)
|
||||
}
|
||||
case *closeUpdate:
|
||||
if b.cancelWatch != nil {
|
||||
b.cancelWatch()
|
||||
b.cancelWatch = nil
|
||||
}
|
||||
if b.edsLB != nil {
|
||||
b.edsLB.Close()
|
||||
b.edsLB = nil
|
||||
}
|
||||
// This is the *ONLY* point of return from this function.
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleClusterUpdate is the CDS watch API callback. It simply pushes the
|
||||
// received information on to the update channel for run() to pick it up.
|
||||
func (b *cdsBalancer) handleClusterUpdate(cu xdsclient.CDSUpdate, err error) {
|
||||
if b.isClosed() {
|
||||
grpclog.Warningf("xds: received cluster update {%+v} after cdsBalancer was closed", cu)
|
||||
return
|
||||
}
|
||||
b.updateCh.Put(&watchUpdate{cds: cu, err: err})
|
||||
}
|
||||
|
||||
// UpdateClientConnState receives the serviceConfig (which contains the
|
||||
// clusterName to watch for in CDS) and the xdsClient object from the
|
||||
// xdsResolver.
|
||||
func (b *cdsBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
|
||||
if b.isClosed() {
|
||||
grpclog.Warningf("xds: received ClientConnState {%+v} after cdsBalancer was closed", state)
|
||||
return errBalancerClosed
|
||||
}
|
||||
|
||||
// The errors checked here should ideally never happen because the
|
||||
// ServiceConfig in this case is prepared by the xdsResolver and is not
|
||||
// something that is received on the wire.
|
||||
lbCfg, ok := state.BalancerConfig.(*lbConfig)
|
||||
if !ok {
|
||||
grpclog.Warningf("xds: unexpected LoadBalancingConfig type: %T", state.BalancerConfig)
|
||||
return balancer.ErrBadResolverState
|
||||
}
|
||||
if lbCfg.ClusterName == "" {
|
||||
grpclog.Warning("xds: no clusterName found in LoadBalancingConfig: %+v", lbCfg)
|
||||
return balancer.ErrBadResolverState
|
||||
}
|
||||
client := state.ResolverState.Attributes.Value(xdsinternal.XDSClientID)
|
||||
if client == nil {
|
||||
grpclog.Warning("xds: no xdsClient found in resolver state attributes")
|
||||
return balancer.ErrBadResolverState
|
||||
}
|
||||
newClient, ok := client.(xdsClientInterface)
|
||||
if !ok {
|
||||
grpclog.Warningf("xds: unexpected xdsClient type: %T", client)
|
||||
return balancer.ErrBadResolverState
|
||||
}
|
||||
b.updateCh.Put(&ccUpdate{client: newClient, clusterName: lbCfg.ClusterName})
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResolverError handles errors reported by the xdsResolver.
|
||||
//
|
||||
// TODO: Make it possible to differentiate between connection errors and
|
||||
// resource not found errors.
|
||||
func (b *cdsBalancer) ResolverError(err error) {
|
||||
if b.isClosed() {
|
||||
grpclog.Warningf("xds: received resolver error {%v} after cdsBalancer was closed", err)
|
||||
return
|
||||
}
|
||||
|
||||
b.updateCh.Put(&ccUpdate{err: err})
|
||||
}
|
||||
|
||||
// UpdateSubConnState handles subConn updates from gRPC.
|
||||
func (b *cdsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
|
||||
if b.isClosed() {
|
||||
grpclog.Warningf("xds: received subConn update {%v, %v} after cdsBalancer was closed", sc, state)
|
||||
return
|
||||
}
|
||||
b.updateCh.Put(&scUpdate{subConn: sc, state: state})
|
||||
}
|
||||
|
||||
// Close closes the cdsBalancer and the underlying edsBalancer.
|
||||
func (b *cdsBalancer) Close() {
|
||||
b.mu.Lock()
|
||||
b.closed = true
|
||||
b.mu.Unlock()
|
||||
b.updateCh.Put(&closeUpdate{})
|
||||
}
|
||||
|
||||
func (b *cdsBalancer) isClosed() bool {
|
||||
b.mu.Lock()
|
||||
closed := b.closed
|
||||
b.mu.Unlock()
|
||||
return closed
|
||||
}
|
||||
|
||||
func (b *cdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
|
||||
grpclog.Error("UpdateSubConnState should be called instead of HandleSubConnStateChange")
|
||||
}
|
||||
|
||||
func (b *cdsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
|
||||
grpclog.Error("UpdateClientConnState should be called instead of HandleResolvedAddrs")
|
||||
}
|
556
xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Normal file
556
xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Normal file
@ -0,0 +1,556 @@
|
||||
/*
|
||||
* Copyright 2019 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cdsbalancer
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/attributes"
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
xdsinternal "google.golang.org/grpc/xds/internal"
|
||||
xdsbalancer "google.golang.org/grpc/xds/internal/balancer"
|
||||
xdsclient "google.golang.org/grpc/xds/internal/client"
|
||||
)
|
||||
|
||||
const (
|
||||
clusterName = "cluster1"
|
||||
serviceName = "service1"
|
||||
defaultTestTimeout = 2 * time.Second
|
||||
)
|
||||
|
||||
type testClientConn struct {
|
||||
balancer.ClientConn
|
||||
}
|
||||
|
||||
// testXDSClient is fake implementation of the xdsClientInterface. It contains
|
||||
// a bunch of channels to signal different events to the test.
|
||||
type testXDSClient struct {
|
||||
// watchCb is the watch API callback registered by the cdsBalancer. Used to
|
||||
// pass different CDS updates to the balancer, from the test.
|
||||
watchCb func(xdsclient.CDSUpdate, error)
|
||||
// clusterCh is a channel used to signal the cluster name for which the
|
||||
// watch API call was invoked on this client.
|
||||
clusterCh chan string
|
||||
// cancelWatchCh is a channel used to signal the cancellation of the
|
||||
// registered watch API.
|
||||
cancelWatchCh chan struct{}
|
||||
}
|
||||
|
||||
func newTestXDSClient() *testXDSClient {
|
||||
return &testXDSClient{
|
||||
clusterCh: make(chan string, 1),
|
||||
cancelWatchCh: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
func (tc *testXDSClient) WatchCluster(clusterName string, callback func(xdsclient.CDSUpdate, error)) func() {
|
||||
tc.watchCb = callback
|
||||
tc.clusterCh <- clusterName
|
||||
return tc.cancelWatch
|
||||
}
|
||||
|
||||
func (tc *testXDSClient) Close() {}
|
||||
|
||||
func (tc *testXDSClient) cancelWatch() {
|
||||
tc.cancelWatchCh <- struct{}{}
|
||||
}
|
||||
|
||||
// waitForWatch verifies if the testXDSClient receives a CDS watch API with the
|
||||
// provided clusterName within a reasonable amount of time.
|
||||
func (tc *testXDSClient) waitForWatch(wantCluster string) error {
|
||||
timer := time.NewTimer(defaultTestTimeout)
|
||||
select {
|
||||
case <-timer.C:
|
||||
return errors.New("Timeout when expecting CDS watch call")
|
||||
case gotCluster := <-tc.clusterCh:
|
||||
timer.Stop()
|
||||
if gotCluster != wantCluster {
|
||||
return fmt.Errorf("WatchCluster called with clusterName: %s, want %s", gotCluster, wantCluster)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// waitForCancelWatch verifies if the CDS watch API is cancelled within a
|
||||
// reasonable amount of time.
|
||||
func (tc *testXDSClient) waitForCancelWatch() error {
|
||||
timer := time.NewTimer(defaultTestTimeout)
|
||||
select {
|
||||
case <-timer.C:
|
||||
return errors.New("Timeout when expecting CDS watch call to be cancelled")
|
||||
case <-tc.cancelWatchCh:
|
||||
timer.Stop()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// cdsWatchInfo wraps the update and the error sent in a CDS watch callback.
|
||||
type cdsWatchInfo struct {
|
||||
update xdsclient.CDSUpdate
|
||||
err error
|
||||
}
|
||||
|
||||
// invokeWatchCb invokes the CDS watch callback registered by the cdsBalancer
|
||||
// and waits for appropriate state to be pushed to the provided edsBalancer.
|
||||
func (tc *testXDSClient) invokeWatchCb(cdsW cdsWatchInfo, wantCCS balancer.ClientConnState, edsB *testEDSBalancer) error {
|
||||
tc.watchCb(cdsW.update, cdsW.err)
|
||||
if cdsW.err != nil {
|
||||
return edsB.waitForResolverError(cdsW.err)
|
||||
}
|
||||
return edsB.waitForClientConnUpdate(wantCCS)
|
||||
}
|
||||
|
||||
// testEDSBalancer is a fake edsBalancer used to verify different actions from
|
||||
// the cdsBalancer. It contains a bunch of channels to signal different events
|
||||
// to the test.
|
||||
type testEDSBalancer struct {
|
||||
// ccsCh is a channel used to signal the receipt of a ClientConn update.
|
||||
ccsCh chan balancer.ClientConnState
|
||||
// scStateCh is a channel used to signal the receipt of a SubConn update.
|
||||
scStateCh chan subConnWithState
|
||||
// resolverErrCh is a channel used to signal a resolver error.
|
||||
resolverErrCh chan error
|
||||
// closeCh is a channel used to signal the closing of this balancer.
|
||||
closeCh chan struct{}
|
||||
}
|
||||
|
||||
type subConnWithState struct {
|
||||
sc balancer.SubConn
|
||||
state balancer.SubConnState
|
||||
}
|
||||
|
||||
func newTestEDSBalancer() *testEDSBalancer {
|
||||
return &testEDSBalancer{
|
||||
ccsCh: make(chan balancer.ClientConnState, 1),
|
||||
scStateCh: make(chan subConnWithState, 1),
|
||||
resolverErrCh: make(chan error, 1),
|
||||
closeCh: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func (tb *testEDSBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
|
||||
tb.ccsCh <- ccs
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tb *testEDSBalancer) ResolverError(err error) {
|
||||
tb.resolverErrCh <- err
|
||||
}
|
||||
|
||||
func (tb *testEDSBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
|
||||
tb.scStateCh <- subConnWithState{sc: sc, state: state}
|
||||
}
|
||||
|
||||
func (tb *testEDSBalancer) Close() {
|
||||
tb.closeCh <- struct{}{}
|
||||
}
|
||||
|
||||
// waitForClientConnUpdate verifies if the testEDSBalancer receives the
|
||||
// provided ClientConnState within a reasonable amount of time.
|
||||
func (tb *testEDSBalancer) waitForClientConnUpdate(wantCCS balancer.ClientConnState) error {
|
||||
timer := time.NewTimer(defaultTestTimeout)
|
||||
select {
|
||||
case <-timer.C:
|
||||
return errors.New("Timeout when expecting ClientConn update on EDS balancer")
|
||||
case gotCCS := <-tb.ccsCh:
|
||||
timer.Stop()
|
||||
if !reflect.DeepEqual(gotCCS, wantCCS) {
|
||||
return fmt.Errorf("received ClientConnState: %+v, want %+v", gotCCS, wantCCS)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// waitForSubConnUpdate verifies if the testEDSBalancer receives the provided
|
||||
// SubConn update within a reasonable amount of time.
|
||||
func (tb *testEDSBalancer) waitForSubConnUpdate(wantSCS subConnWithState) error {
|
||||
timer := time.NewTimer(defaultTestTimeout)
|
||||
select {
|
||||
case <-timer.C:
|
||||
return errors.New("Timeout when expecting SubConn update on EDS balancer")
|
||||
case gotSCS := <-tb.scStateCh:
|
||||
timer.Stop()
|
||||
if !reflect.DeepEqual(gotSCS, wantSCS) {
|
||||
return fmt.Errorf("received SubConnState: %+v, want %+v", gotSCS, wantSCS)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// waitForResolverError verifies if the testEDSBalancer receives the
|
||||
// provided resolver error within a reasonable amount of time.
|
||||
func (tb *testEDSBalancer) waitForResolverError(wantErr error) error {
|
||||
timer := time.NewTimer(defaultTestTimeout)
|
||||
select {
|
||||
case <-timer.C:
|
||||
return errors.New("Timeout when expecting a resolver error")
|
||||
case gotErr := <-tb.resolverErrCh:
|
||||
timer.Stop()
|
||||
if gotErr != wantErr {
|
||||
return fmt.Errorf("received resolver error: %v, want %v", gotErr, wantErr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// waitForClose verifies that the edsBalancer is closed with a reasonable
|
||||
// amount of time.
|
||||
func (tb *testEDSBalancer) waitForClose() error {
|
||||
timer := time.NewTimer(defaultTestTimeout)
|
||||
select {
|
||||
case <-timer.C:
|
||||
return errors.New("Timeout when expecting a close")
|
||||
case <-tb.closeCh:
|
||||
timer.Stop()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// cdsCCS is a helper function to construct a good update passed from the
|
||||
// xdsResolver to the cdsBalancer.
|
||||
func cdsCCS(cluster string, xdsClient interface{}) balancer.ClientConnState {
|
||||
const cdsLBConfig = `{
|
||||
"loadBalancingConfig":[
|
||||
{
|
||||
"experimental_cds":{
|
||||
"Cluster": %s
|
||||
}
|
||||
}
|
||||
]
|
||||
}`
|
||||
jsonSC := fmt.Sprintf(cdsLBConfig, cluster)
|
||||
return balancer.ClientConnState{
|
||||
ResolverState: resolver.State{
|
||||
ServiceConfig: internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(jsonSC),
|
||||
Attributes: attributes.New(xdsinternal.XDSClientID, xdsClient),
|
||||
},
|
||||
BalancerConfig: &lbConfig{ClusterName: clusterName},
|
||||
}
|
||||
}
|
||||
|
||||
// edsCCS is a helper function to construct a good update passed from the
|
||||
// cdsBalancer to the edsBalancer.
|
||||
func edsCCS(service string, enableLRS bool, xdsClient interface{}) balancer.ClientConnState {
|
||||
lbCfg := &xdsbalancer.XDSConfig{EDSServiceName: service}
|
||||
if enableLRS {
|
||||
lbCfg.LrsLoadReportingServerName = new(string)
|
||||
}
|
||||
return balancer.ClientConnState{
|
||||
ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsClient)},
|
||||
BalancerConfig: lbCfg,
|
||||
}
|
||||
}
|
||||
|
||||
// setup creates a cdsBalancer and an edsBalancer (and overrides the
|
||||
// newEDSBalancer function to return it), and also returns a cleanup function.
|
||||
func setup() (*cdsBalancer, *testEDSBalancer, func()) {
|
||||
builder := cdsBB{}
|
||||
tcc := &testClientConn{}
|
||||
cdsB := builder.Build(tcc, balancer.BuildOptions{}).(balancer.V2Balancer)
|
||||
|
||||
edsB := newTestEDSBalancer()
|
||||
oldEDSBalancerBuilder := newEDSBalancer
|
||||
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.V2Balancer {
|
||||
return edsB
|
||||
}
|
||||
|
||||
return cdsB.(*cdsBalancer), edsB, func() {
|
||||
newEDSBalancer = oldEDSBalancerBuilder
|
||||
}
|
||||
}
|
||||
|
||||
// setupWithWatch does everything that setup does, and also pushes a ClientConn
|
||||
// update to the cdsBalancer and waits for a CDS watch call to be registered.
|
||||
func setupWithWatch(t *testing.T) (*testXDSClient, *cdsBalancer, *testEDSBalancer, func()) {
|
||||
t.Helper()
|
||||
|
||||
xdsC := newTestXDSClient()
|
||||
cdsB, edsB, cancel := setup()
|
||||
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil {
|
||||
t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err)
|
||||
}
|
||||
if err := xdsC.waitForWatch(clusterName); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return xdsC, cdsB, edsB, cancel
|
||||
}
|
||||
|
||||
// TestUpdateClientConnState invokes the UpdateClientConnState method on the
|
||||
// cdsBalancer with different inputs and verifies that the CDS watch API on the
|
||||
// provided xdsClient is invoked appropriately.
|
||||
func TestUpdateClientConnState(t *testing.T) {
|
||||
xdsC := newTestXDSClient()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
ccs balancer.ClientConnState
|
||||
wantErr error
|
||||
wantCluster string
|
||||
}{
|
||||
{
|
||||
name: "bad-lbCfg-type",
|
||||
ccs: balancer.ClientConnState{BalancerConfig: nil},
|
||||
wantErr: balancer.ErrBadResolverState,
|
||||
},
|
||||
{
|
||||
name: "empty-cluster-in-lbCfg",
|
||||
ccs: balancer.ClientConnState{BalancerConfig: &lbConfig{ClusterName: ""}},
|
||||
wantErr: balancer.ErrBadResolverState,
|
||||
},
|
||||
{
|
||||
name: "no-xdsClient-in-attributes",
|
||||
ccs: balancer.ClientConnState{
|
||||
ResolverState: resolver.State{
|
||||
Attributes: attributes.New("key", "value"),
|
||||
},
|
||||
BalancerConfig: &lbConfig{ClusterName: clusterName},
|
||||
},
|
||||
wantErr: balancer.ErrBadResolverState,
|
||||
},
|
||||
{
|
||||
name: "bad-xdsClient-in-attributes",
|
||||
ccs: balancer.ClientConnState{
|
||||
ResolverState: resolver.State{
|
||||
Attributes: attributes.New(xdsinternal.XDSClientID, "value"),
|
||||
},
|
||||
BalancerConfig: &lbConfig{ClusterName: clusterName},
|
||||
},
|
||||
wantErr: balancer.ErrBadResolverState,
|
||||
},
|
||||
{
|
||||
name: "happy-good-case",
|
||||
ccs: cdsCCS(clusterName, xdsC),
|
||||
wantCluster: clusterName,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
cdsB, _, cancel := setup()
|
||||
defer func() {
|
||||
cancel()
|
||||
cdsB.Close()
|
||||
}()
|
||||
|
||||
if err := cdsB.UpdateClientConnState(test.ccs); err != test.wantErr {
|
||||
t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err)
|
||||
}
|
||||
if test.wantErr != nil {
|
||||
// When we wanted an error and got it, we should return early.
|
||||
return
|
||||
}
|
||||
if err := xdsC.waitForWatch(test.wantCluster); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestUpdateClientConnStateAfterClose invokes the UpdateClientConnState method
|
||||
// on the cdsBalancer after close and verifies that it returns an error.
|
||||
func TestUpdateClientConnStateAfterClose(t *testing.T) {
|
||||
cdsB, _, cancel := setup()
|
||||
defer cancel()
|
||||
cdsB.Close()
|
||||
|
||||
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, newTestXDSClient())); err != errBalancerClosed {
|
||||
t.Fatalf("UpdateClientConnState() after close returned %v, want %v", err, errBalancerClosed)
|
||||
}
|
||||
}
|
||||
|
||||
// TestUpdateClientConnStateWithSameState verifies that a ClientConnState
|
||||
// update with the same cluster and xdsClient does not cause the cdsBalancer to
|
||||
// create a new watch.
|
||||
func TestUpdateClientConnStateWithSameState(t *testing.T) {
|
||||
xdsC, cdsB, _, cancel := setupWithWatch(t)
|
||||
defer func() {
|
||||
cancel()
|
||||
cdsB.Close()
|
||||
}()
|
||||
|
||||
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil {
|
||||
t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err)
|
||||
}
|
||||
if err := xdsC.waitForWatch(clusterName); err == nil {
|
||||
t.Fatal("Waiting for WatchCluster() should have timed out, but returned with nil error")
|
||||
}
|
||||
}
|
||||
|
||||
// TestHandleClusterUpdate invokes the registered CDS watch callback with
|
||||
// different updates and verifies that the expect ClientConnState is propagated
|
||||
// to the edsBalancer.
|
||||
func TestHandleClusterUpdate(t *testing.T) {
|
||||
xdsC, cdsB, edsB, cancel := setupWithWatch(t)
|
||||
defer func() {
|
||||
cancel()
|
||||
cdsB.Close()
|
||||
}()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
cdsUpdate xdsclient.CDSUpdate
|
||||
updateErr error
|
||||
wantCCS balancer.ClientConnState
|
||||
}{
|
||||
{
|
||||
name: "happy-case-with-lrs",
|
||||
cdsUpdate: xdsclient.CDSUpdate{ServiceName: serviceName, EnableLRS: true},
|
||||
wantCCS: edsCCS(serviceName, true, xdsC),
|
||||
},
|
||||
{
|
||||
name: "happy-case-without-lrs",
|
||||
cdsUpdate: xdsclient.CDSUpdate{ServiceName: serviceName},
|
||||
wantCCS: edsCCS(serviceName, false, xdsC),
|
||||
},
|
||||
{
|
||||
name: "cdsWatch-returns-error",
|
||||
updateErr: errors.New("cdsUpdate error"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
if err := xdsC.invokeWatchCb(cdsWatchInfo{test.cdsUpdate, test.updateErr}, test.wantCCS, edsB); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolverError verifies that an existing watch is cancelled when a
|
||||
// resolver error is received by the cdsBalancer, and also that the same error
|
||||
// is propagated to the edsBalancer.
|
||||
func TestResolverError(t *testing.T) {
|
||||
xdsC, cdsB, edsB, cancel := setupWithWatch(t)
|
||||
defer func() {
|
||||
cancel()
|
||||
cdsB.Close()
|
||||
}()
|
||||
|
||||
cdsUpdate := xdsclient.CDSUpdate{ServiceName: serviceName}
|
||||
wantCCS := edsCCS(serviceName, false, xdsC)
|
||||
if err := xdsC.invokeWatchCb(cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
rErr := errors.New("cdsBalancer resolver error")
|
||||
cdsB.ResolverError(rErr)
|
||||
if err := xdsC.waitForCancelWatch(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := edsB.waitForResolverError(rErr); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestUpdateSubConnState pushes a SubConn update to the cdsBalancer and
|
||||
// verifies that the update is propagated to the edsBalancer.
|
||||
func TestUpdateSubConnState(t *testing.T) {
|
||||
xdsC, cdsB, edsB, cancel := setupWithWatch(t)
|
||||
defer func() {
|
||||
cancel()
|
||||
cdsB.Close()
|
||||
}()
|
||||
|
||||
cdsUpdate := xdsclient.CDSUpdate{ServiceName: serviceName}
|
||||
wantCCS := edsCCS(serviceName, false, xdsC)
|
||||
if err := xdsC.invokeWatchCb(cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var sc balancer.SubConn
|
||||
state := balancer.SubConnState{ConnectivityState: connectivity.Ready}
|
||||
cdsB.UpdateSubConnState(sc, state)
|
||||
if err := edsB.waitForSubConnUpdate(subConnWithState{sc: sc, state: state}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestClose calls Close() on the cdsBalancer, and verifies that the underlying
|
||||
// edsBalancer is also closed.
|
||||
func TestClose(t *testing.T) {
|
||||
xdsC, cdsB, edsB, cancel := setupWithWatch(t)
|
||||
defer cancel()
|
||||
|
||||
cdsUpdate := xdsclient.CDSUpdate{ServiceName: serviceName}
|
||||
wantCCS := edsCCS(serviceName, false, xdsC)
|
||||
if err := xdsC.invokeWatchCb(cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cdsB.Close()
|
||||
if err := xdsC.waitForCancelWatch(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := edsB.waitForClose(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestParseConfig exercises the config parsing functionality in the cds
|
||||
// balancer builder.
|
||||
func TestParseConfig(t *testing.T) {
|
||||
bb := cdsBB{}
|
||||
if gotName := bb.Name(); gotName != cdsName {
|
||||
t.Fatalf("cdsBB.Name() = %v, want %v", gotName, cdsName)
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
input json.RawMessage
|
||||
wantCfg serviceconfig.LoadBalancingConfig
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "good-lb-config",
|
||||
input: json.RawMessage(`{"Cluster": "cluster1"}`),
|
||||
wantCfg: &lbConfig{ClusterName: clusterName},
|
||||
},
|
||||
{
|
||||
name: "unknown-fields-in-lb-config",
|
||||
input: json.RawMessage(`{"Unknown": "foobar"}`),
|
||||
wantCfg: &lbConfig{ClusterName: ""},
|
||||
},
|
||||
{
|
||||
name: "empty-lb-config",
|
||||
input: json.RawMessage(""),
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
gotCfg, gotErr := bb.ParseConfig(test.input)
|
||||
if (gotErr != nil) != test.wantErr {
|
||||
t.Fatalf("bb.ParseConfig(%v) = %v, wantErr %v", string(test.input), gotErr, test.wantErr)
|
||||
}
|
||||
if !test.wantErr {
|
||||
if !reflect.DeepEqual(gotCfg, test.wantCfg) {
|
||||
t.Fatalf("bb.ParseConfig(%v) = %v, want %v", string(test.input), gotCfg, test.wantCfg)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -330,7 +330,10 @@ func (x *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Sub
|
||||
}
|
||||
|
||||
func (x *edsBalancer) ResolverError(error) {
|
||||
// Ignore for now
|
||||
// TODO: Need to distinguish between connection errors and resource removed
|
||||
// errors. For the former, we will need to handle it later on for fallback.
|
||||
// For the latter, handle it by stopping the watch, closing sub-balancers
|
||||
// and pickers.
|
||||
}
|
||||
|
||||
func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
||||
|
Reference in New Issue
Block a user