xds Balancer: implementation (#2614)
This commit is contained in:
58
balancer/xds/edsbalancer/edsbalancer.go
Normal file
58
balancer/xds/edsbalancer/edsbalancer.go
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
/*
|
||||||
|
*
|
||||||
|
* Copyright 2019 gRPC authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package edsbalancer
|
||||||
|
|
||||||
|
// TODO: this file is used as a place holder. It should be deleted after edsbalancer implementation
|
||||||
|
// is merged.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||||||
|
"google.golang.org/grpc/balancer"
|
||||||
|
"google.golang.org/grpc/connectivity"
|
||||||
|
"google.golang.org/grpc/resolver"
|
||||||
|
)
|
||||||
|
|
||||||
|
type dummyEdsBalancer struct{}
|
||||||
|
|
||||||
|
func (d *dummyEdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyEdsBalancer) HandleResolvedAddrs([]resolver.Address, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyEdsBalancer) Close() {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyEdsBalancer) HandleEDSResponse(edsResp *v2.ClusterLoadAssignment) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyEdsBalancer) HandleChildPolicy(name string, config json.RawMessage) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewXDSBalancer creates an edsBalancer
|
||||||
|
func NewXDSBalancer(cc balancer.ClientConn) interface{} {
|
||||||
|
return &dummyEdsBalancer{}
|
||||||
|
}
|
@ -1,4 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
|
*
|
||||||
* Copyright 2019 gRPC authors.
|
* Copyright 2019 gRPC authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
@ -12,7 +13,588 @@
|
|||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
// Package xds implements xds balancer.
|
// Package xds implements a balancer that communicates with a remote balancer using the Envoy xDS
|
||||||
package xds // import "google.golang.org/grpc/balancer/xds"
|
// protocol.
|
||||||
|
package xds
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||||||
|
"github.com/gogo/protobuf/proto"
|
||||||
|
"google.golang.org/grpc/balancer"
|
||||||
|
"google.golang.org/grpc/balancer/xds/edsbalancer"
|
||||||
|
"google.golang.org/grpc/connectivity"
|
||||||
|
"google.golang.org/grpc/grpclog"
|
||||||
|
"google.golang.org/grpc/resolver"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultTimeout = 10 * time.Second
|
||||||
|
xdsName = "xds"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// This field is for testing purpose.
|
||||||
|
// TODO: if later we make startupTimeout configurable through BuildOptions(maybe?), then we can remove
|
||||||
|
// this field and configure through BuildOptions instead.
|
||||||
|
startupTimeout = defaultTimeout
|
||||||
|
newEDSBalancer = edsbalancer.NewXDSBalancer
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
balancer.Register(newXDSBalancerBuilder())
|
||||||
|
}
|
||||||
|
|
||||||
|
type xdsBalancerBuilder struct{}
|
||||||
|
|
||||||
|
func newXDSBalancerBuilder() balancer.Builder {
|
||||||
|
return &xdsBalancerBuilder{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *xdsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
x := &xdsBalancer{
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
buildOpts: opts,
|
||||||
|
startupTimeout: startupTimeout,
|
||||||
|
connStateMgr: &connStateMgr{},
|
||||||
|
startup: true,
|
||||||
|
grpcUpdate: make(chan interface{}),
|
||||||
|
xdsClientUpdate: make(chan interface{}),
|
||||||
|
timer: createDrainedTimer(), // initialized a timer that won't fire without reset
|
||||||
|
}
|
||||||
|
x.cc = &xdsClientConn{
|
||||||
|
updateState: x.connStateMgr.updateState,
|
||||||
|
ClientConn: cc,
|
||||||
|
}
|
||||||
|
go x.run()
|
||||||
|
return x
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *xdsBalancerBuilder) Name() string {
|
||||||
|
return xdsName
|
||||||
|
}
|
||||||
|
|
||||||
|
// EdsBalancer defines the interface that edsBalancer must implement to communicate with xdsBalancer.
|
||||||
|
type EdsBalancer interface {
|
||||||
|
balancer.Balancer
|
||||||
|
// HandleEDSResponse passes the received EDS message from traffic director to eds balancer.
|
||||||
|
HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment)
|
||||||
|
// HandleChildPolicy updates the eds balancer the intra-cluster load balancing policy to use.
|
||||||
|
HandleChildPolicy(name string, config json.RawMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
// xdsBalancer manages xdsClient and the actual balancer that does load balancing (either edsBalancer,
|
||||||
|
// or fallback LB).
|
||||||
|
type xdsBalancer struct {
|
||||||
|
cc balancer.ClientConn // *xdsClientConn
|
||||||
|
buildOpts balancer.BuildOptions
|
||||||
|
startupTimeout time.Duration
|
||||||
|
xdsStaleTimeout *time.Duration
|
||||||
|
connStateMgr *connStateMgr
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
startup bool // startup indicates whether this xdsBalancer is in startup stage.
|
||||||
|
inFallbackMonitor bool
|
||||||
|
|
||||||
|
// xdsBalancer continuously monitor the channels below, and will handle events from them in sync.
|
||||||
|
grpcUpdate chan interface{}
|
||||||
|
xdsClientUpdate chan interface{}
|
||||||
|
timer *time.Timer
|
||||||
|
noSubConnAlert <-chan struct{}
|
||||||
|
|
||||||
|
client *client // may change when passed a different service config
|
||||||
|
config *xdsConfig // may change when passed a different service config
|
||||||
|
xdsLB EdsBalancer
|
||||||
|
fallbackLB balancer.Balancer
|
||||||
|
fallbackInitData *addressUpdate // may change when HandleResolved address is called
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *xdsBalancer) startNewXDSClient(u *xdsConfig) {
|
||||||
|
// If the xdsBalancer is in startup stage, then we need to apply the startup timeout for the first
|
||||||
|
// xdsClient to get a response from the traffic director.
|
||||||
|
if x.startup {
|
||||||
|
x.startFallbackMonitoring()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Whenever service config gives a new traffic director name, we need to create an xds client to
|
||||||
|
// connect to it. However, previous xds client should not be closed until the new one successfully
|
||||||
|
// connects to the traffic director (i.e. get an ADS response from the traffic director). Therefore,
|
||||||
|
// we let each new client to be responsible to close its immediate predecessor. In this way,
|
||||||
|
// xdsBalancer does not to implement complex synchronization to achieve the same purpose.
|
||||||
|
prevClient := x.client
|
||||||
|
// haveGotADS is true means, this xdsClient has got ADS response from director in the past, which
|
||||||
|
// means it can close previous client if it hasn't and it now can send lose contact signal for
|
||||||
|
// fallback monitoring.
|
||||||
|
var haveGotADS bool
|
||||||
|
|
||||||
|
// set up callbacks for the xds client.
|
||||||
|
newADS := func(ctx context.Context, resp proto.Message) error {
|
||||||
|
if !haveGotADS {
|
||||||
|
if prevClient != nil {
|
||||||
|
prevClient.close()
|
||||||
|
}
|
||||||
|
haveGotADS = true
|
||||||
|
}
|
||||||
|
return x.newADSResponse(ctx, resp)
|
||||||
|
}
|
||||||
|
loseContact := func(ctx context.Context) {
|
||||||
|
// loseContact signal is only useful when the current xds client has received ADS response before,
|
||||||
|
// and has not been closed by later xds client.
|
||||||
|
if haveGotADS {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
x.loseContact(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
exitCleanup := func() {
|
||||||
|
// Each xds client is responsible to close its predecessor if there's one. There are two paths
|
||||||
|
// for a xds client to close its predecessor:
|
||||||
|
// 1. Once it receives its first ADS response.
|
||||||
|
// 2. It hasn't received its first ADS response yet, but its own successor has received ADS
|
||||||
|
// response (which triggers the exit of it). Therefore, it needs to close its predecessor if
|
||||||
|
// it has one.
|
||||||
|
// Here the exitCleanup is for the 2nd path.
|
||||||
|
if !haveGotADS && prevClient != nil {
|
||||||
|
prevClient.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
x.client = newXDSClient(u.BalancerName, x.cc.Target(), u.ChildPolicy == nil, x.buildOpts, newADS, loseContact, exitCleanup)
|
||||||
|
go x.client.run()
|
||||||
|
}
|
||||||
|
|
||||||
|
// run gets executed in a goroutine once xdsBalancer is created. It monitors updates from grpc,
|
||||||
|
// xdsClient and load balancer. It synchronizes the operations that happen inside xdsBalancer. It
|
||||||
|
// exits when xdsBalancer is closed.
|
||||||
|
func (x *xdsBalancer) run() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case update := <-x.grpcUpdate:
|
||||||
|
x.handleGRPCUpdate(update)
|
||||||
|
case update := <-x.xdsClientUpdate:
|
||||||
|
x.handleXDSClientUpdate(update)
|
||||||
|
case <-x.timer.C: // x.timer.C will block if we are not in fallback monitoring stage.
|
||||||
|
x.switchFallback()
|
||||||
|
case <-x.noSubConnAlert: // x.noSubConnAlert will block if we are not in fallback monitoring stage.
|
||||||
|
x.switchFallback()
|
||||||
|
case <-x.ctx.Done():
|
||||||
|
if x.client != nil {
|
||||||
|
x.client.close()
|
||||||
|
}
|
||||||
|
if x.xdsLB != nil {
|
||||||
|
x.xdsLB.Close()
|
||||||
|
}
|
||||||
|
if x.fallbackLB != nil {
|
||||||
|
x.fallbackLB.Close()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *xdsBalancer) handleGRPCUpdate(update interface{}) {
|
||||||
|
switch u := update.(type) {
|
||||||
|
case *addressUpdate:
|
||||||
|
if x.fallbackLB != nil {
|
||||||
|
x.fallbackLB.HandleResolvedAddrs(u.addrs, u.err)
|
||||||
|
}
|
||||||
|
x.fallbackInitData = u
|
||||||
|
case *subConnStateUpdate:
|
||||||
|
if x.xdsLB != nil {
|
||||||
|
x.xdsLB.HandleSubConnStateChange(u.sc, u.state)
|
||||||
|
}
|
||||||
|
if x.fallbackLB != nil {
|
||||||
|
x.fallbackLB.HandleSubConnStateChange(u.sc, u.state)
|
||||||
|
}
|
||||||
|
case *xdsConfig:
|
||||||
|
if x.config == nil {
|
||||||
|
// The first time we get config, we just need to start the xdsClient.
|
||||||
|
x.startNewXDSClient(u)
|
||||||
|
x.config = u
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// With a different BalancerName, we need to create a new xdsClient.
|
||||||
|
// If current or previous ChildPolicy is nil, then we also need to recreate a new xdsClient.
|
||||||
|
// This is because with nil ChildPolicy xdsClient will do CDS request, while non-nil won't.
|
||||||
|
if u.BalancerName != x.config.BalancerName || (u.ChildPolicy == nil) != (x.config.ChildPolicy == nil) {
|
||||||
|
x.startNewXDSClient(u)
|
||||||
|
}
|
||||||
|
// We will update the xdsLB with the new child policy, if we got a different one and it's not nil.
|
||||||
|
// The nil case will be handled when the CDS response gets processed, we will update xdsLB at that time.
|
||||||
|
if !reflect.DeepEqual(u.ChildPolicy, x.config.ChildPolicy) && u.ChildPolicy != nil && x.xdsLB != nil {
|
||||||
|
x.xdsLB.HandleChildPolicy(u.ChildPolicy.Name, u.ChildPolicy.Config)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(u.FallBackPolicy, x.config.FallBackPolicy) && x.fallbackLB != nil {
|
||||||
|
x.fallbackLB.Close()
|
||||||
|
x.startFallBackBalancer(u)
|
||||||
|
}
|
||||||
|
x.config = u
|
||||||
|
default:
|
||||||
|
// unreachable path
|
||||||
|
panic("wrong update type")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *xdsBalancer) handleXDSClientUpdate(update interface{}) {
|
||||||
|
switch u := update.(type) {
|
||||||
|
case *cdsResp:
|
||||||
|
select {
|
||||||
|
case <-u.ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
x.cancelFallbackAndSwitchEDSBalancerIfNecessary()
|
||||||
|
// TODO: Get the optional xds record stale timeout from OutlierDetection message. If not exist,
|
||||||
|
// reset to 0.
|
||||||
|
// x.xdsStaleTimeout = u.OutlierDetection.TO_BE_DEFINED_AND_ADDED
|
||||||
|
x.xdsLB.HandleChildPolicy(u.resp.LbPolicy.String(), nil)
|
||||||
|
case *edsResp:
|
||||||
|
select {
|
||||||
|
case <-u.ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
x.cancelFallbackAndSwitchEDSBalancerIfNecessary()
|
||||||
|
x.xdsLB.HandleEDSResponse(u.resp)
|
||||||
|
case *loseContact:
|
||||||
|
select {
|
||||||
|
case <-u.ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
// if we are already doing fallback monitoring, then we ignore new loseContact signal.
|
||||||
|
if x.inFallbackMonitor {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
x.inFallbackMonitor = true
|
||||||
|
x.startFallbackMonitoring()
|
||||||
|
default:
|
||||||
|
panic("unexpected xds client update type")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type connStateMgr struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
curState connectivity.State
|
||||||
|
notify chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *connStateMgr) updateState(s connectivity.State) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
c.curState = s
|
||||||
|
if s != connectivity.Ready && c.notify != nil {
|
||||||
|
close(c.notify)
|
||||||
|
c.notify = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *connStateMgr) notifyWhenNotReady() <-chan struct{} {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
if c.curState != connectivity.Ready {
|
||||||
|
ch := make(chan struct{})
|
||||||
|
close(ch)
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
c.notify = make(chan struct{})
|
||||||
|
return c.notify
|
||||||
|
}
|
||||||
|
|
||||||
|
// xdsClientConn wraps around the balancer.ClientConn passed in from grpc. The wrapping is to add
|
||||||
|
// functionality to get notification when no subconn is in READY state.
|
||||||
|
// TODO: once we have the change that keeps both edsbalancer and fallback balancer alive at the same
|
||||||
|
// time, we need to make sure to synchronize updates from both entities on the ClientConn.
|
||||||
|
type xdsClientConn struct {
|
||||||
|
updateState func(s connectivity.State)
|
||||||
|
balancer.ClientConn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *xdsClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
|
||||||
|
w.updateState(s)
|
||||||
|
w.ClientConn.UpdateBalancerState(s, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
type addressUpdate struct {
|
||||||
|
addrs []resolver.Address
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
type subConnStateUpdate struct {
|
||||||
|
sc balancer.SubConn
|
||||||
|
state connectivity.State
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *xdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
|
||||||
|
update := &subConnStateUpdate{
|
||||||
|
sc: sc,
|
||||||
|
state: state,
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case x.grpcUpdate <- update:
|
||||||
|
case <-x.ctx.Done():
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *xdsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
|
||||||
|
update := &addressUpdate{
|
||||||
|
addrs: addrs,
|
||||||
|
err: err,
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case x.grpcUpdate <- update:
|
||||||
|
case <-x.ctx.Done():
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: once the API is merged, check whether we need to change the function name/signature here.
|
||||||
|
func (x *xdsBalancer) HandleBalancerConfig(config json.RawMessage) error {
|
||||||
|
var cfg xdsConfig
|
||||||
|
if err := json.Unmarshal(config, &cfg); err != nil {
|
||||||
|
return errors.New("unable to unmarshal balancer config into xds config")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case x.grpcUpdate <- &cfg:
|
||||||
|
case <-x.ctx.Done():
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type cdsResp struct {
|
||||||
|
ctx context.Context
|
||||||
|
resp *xdspb.Cluster
|
||||||
|
}
|
||||||
|
|
||||||
|
type edsResp struct {
|
||||||
|
ctx context.Context
|
||||||
|
resp *xdspb.ClusterLoadAssignment
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *xdsBalancer) newADSResponse(ctx context.Context, resp proto.Message) error {
|
||||||
|
var update interface{}
|
||||||
|
switch u := resp.(type) {
|
||||||
|
case *xdspb.Cluster:
|
||||||
|
if u.GetName() != x.cc.Target() {
|
||||||
|
return fmt.Errorf("unmatched service name, got %s, want %s", u.GetName(), x.cc.Target())
|
||||||
|
}
|
||||||
|
if u.GetType() != xdspb.Cluster_EDS {
|
||||||
|
return fmt.Errorf("unexpected service discovery type, got %v, want %v", u.GetType(), xdspb.Cluster_EDS)
|
||||||
|
}
|
||||||
|
update = &cdsResp{ctx: ctx, resp: u}
|
||||||
|
case *xdspb.ClusterLoadAssignment:
|
||||||
|
// nothing to check
|
||||||
|
update = &edsResp{ctx: ctx, resp: u}
|
||||||
|
default:
|
||||||
|
grpclog.Warningf("xdsBalancer: got a response that's neither CDS nor EDS, type = %T", u)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case x.xdsClientUpdate <- update:
|
||||||
|
case <-x.ctx.Done():
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type loseContact struct {
|
||||||
|
ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *xdsBalancer) loseContact(ctx context.Context) {
|
||||||
|
select {
|
||||||
|
case x.xdsClientUpdate <- &loseContact{ctx: ctx}:
|
||||||
|
case <-x.ctx.Done():
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *xdsBalancer) switchFallback() {
|
||||||
|
if x.xdsLB != nil {
|
||||||
|
x.xdsLB.Close()
|
||||||
|
x.xdsLB = nil
|
||||||
|
}
|
||||||
|
x.startFallBackBalancer(x.config)
|
||||||
|
x.cancelFallbackMonitoring()
|
||||||
|
}
|
||||||
|
|
||||||
|
// x.cancelFallbackAndSwitchEDSBalancerIfNecessary() will be no-op if we have a working xds client.
|
||||||
|
// It will cancel fallback monitoring if we are in fallback monitoring stage.
|
||||||
|
// If there's no running edsBalancer currently, it will create one and initialize it. Also, it will
|
||||||
|
// shutdown the fallback balancer if there's one running.
|
||||||
|
func (x *xdsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() {
|
||||||
|
// xDS update will cancel fallback monitoring if we are in fallback monitoring stage.
|
||||||
|
x.cancelFallbackMonitoring()
|
||||||
|
|
||||||
|
// xDS update will switch balancer back to edsBalancer if we are in fallback.
|
||||||
|
if x.xdsLB == nil {
|
||||||
|
if x.fallbackLB != nil {
|
||||||
|
x.fallbackLB.Close()
|
||||||
|
x.fallbackLB = nil
|
||||||
|
}
|
||||||
|
x.xdsLB = newEDSBalancer(x.cc).(EdsBalancer)
|
||||||
|
if x.config.ChildPolicy != nil {
|
||||||
|
x.xdsLB.HandleChildPolicy(x.config.ChildPolicy.Name, x.config.ChildPolicy.Config)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *xdsBalancer) startFallBackBalancer(c *xdsConfig) {
|
||||||
|
if c.FallBackPolicy == nil {
|
||||||
|
x.startFallBackBalancer(&xdsConfig{
|
||||||
|
FallBackPolicy: &loadBalancingConfig{
|
||||||
|
Name: "round_robin",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// builder will always be non-nil, since when parse JSON into xdsConfig, we check whether the specified
|
||||||
|
// balancer is registered or not.
|
||||||
|
builder := balancer.Get(c.FallBackPolicy.Name)
|
||||||
|
x.fallbackLB = builder.Build(x.cc, x.buildOpts)
|
||||||
|
if x.fallbackInitData != nil {
|
||||||
|
// TODO: uncomment when HandleBalancerConfig API is merged.
|
||||||
|
//x.fallbackLB.HandleBalancerConfig(c.FallBackPolicy.Config)
|
||||||
|
x.fallbackLB.HandleResolvedAddrs(x.fallbackInitData.addrs, x.fallbackInitData.err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// There are three ways that could lead to fallback:
|
||||||
|
// 1. During startup (i.e. the first xds client is just created and attempts to contact the traffic
|
||||||
|
// director), fallback if it has not received any response from the director within the configured
|
||||||
|
// timeout.
|
||||||
|
// 2. After xds client loses contact with the remote, fallback if all connections to the backends are
|
||||||
|
// lost (i.e. not in state READY).
|
||||||
|
// 3. After xds client loses contact with the remote, fallback if the stale eds timeout has been
|
||||||
|
// configured through CDS and is timed out.
|
||||||
|
func (x *xdsBalancer) startFallbackMonitoring() {
|
||||||
|
if x.startup {
|
||||||
|
x.startup = false
|
||||||
|
x.timer.Reset(x.startupTimeout)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
x.noSubConnAlert = x.connStateMgr.notifyWhenNotReady()
|
||||||
|
if x.xdsStaleTimeout != nil {
|
||||||
|
if !x.timer.Stop() {
|
||||||
|
<-x.timer.C
|
||||||
|
}
|
||||||
|
x.timer.Reset(*x.xdsStaleTimeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// There are two cases where fallback monitoring should be canceled:
|
||||||
|
// 1. xDS client returns a new ADS message.
|
||||||
|
// 2. fallback has been triggered.
|
||||||
|
func (x *xdsBalancer) cancelFallbackMonitoring() {
|
||||||
|
if !x.timer.Stop() {
|
||||||
|
select {
|
||||||
|
case <-x.timer.C:
|
||||||
|
// For cases where some fallback condition happens along with the timeout, but timeout loses
|
||||||
|
// the race, so we need to drain the x.timer.C. thus we don't trigger fallback again.
|
||||||
|
default:
|
||||||
|
// if the timer timeout leads us here, then there's no thing to drain from x.timer.C.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
x.noSubConnAlert = nil
|
||||||
|
x.inFallbackMonitor = false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *xdsBalancer) Close() {
|
||||||
|
x.cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
func createDrainedTimer() *time.Timer {
|
||||||
|
timer := time.NewTimer(0 * time.Millisecond)
|
||||||
|
// make sure initially the timer channel is blocking until reset.
|
||||||
|
if !timer.Stop() {
|
||||||
|
<-timer.C
|
||||||
|
}
|
||||||
|
return timer
|
||||||
|
}
|
||||||
|
|
||||||
|
type xdsConfig struct {
|
||||||
|
BalancerName string
|
||||||
|
ChildPolicy *loadBalancingConfig
|
||||||
|
FallBackPolicy *loadBalancingConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
// When unmarshalling json to xdsConfig, we iterate through the childPolicy/fallbackPolicy lists
|
||||||
|
// and select the first LB policy which has been registered to be stored in the returned xdsConfig.
|
||||||
|
func (p *xdsConfig) UnmarshalJSON(data []byte) error {
|
||||||
|
var val map[string]json.RawMessage
|
||||||
|
if err := json.Unmarshal(data, &val); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for k, v := range val {
|
||||||
|
switch k {
|
||||||
|
case "BalancerName":
|
||||||
|
if err := json.Unmarshal(v, &p.BalancerName); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case "ChildPolicy":
|
||||||
|
var lbcfgs []*loadBalancingConfig
|
||||||
|
if err := json.Unmarshal(v, &lbcfgs); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, lbcfg := range lbcfgs {
|
||||||
|
if balancer.Get(lbcfg.Name) != nil {
|
||||||
|
p.ChildPolicy = lbcfg
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case "FallbackPolicy":
|
||||||
|
var lbcfgs []*loadBalancingConfig
|
||||||
|
if err := json.Unmarshal(v, &lbcfgs); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, lbcfg := range lbcfgs {
|
||||||
|
if balancer.Get(lbcfg.Name) != nil {
|
||||||
|
p.FallBackPolicy = lbcfg
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *xdsConfig) MarshalJSON() ([]byte, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type loadBalancingConfig struct {
|
||||||
|
Name string
|
||||||
|
Config json.RawMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *loadBalancingConfig) MarshalJSON() ([]byte, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *loadBalancingConfig) UnmarshalJSON(data []byte) error {
|
||||||
|
var cfg map[string]json.RawMessage
|
||||||
|
if err := json.Unmarshal(data, &cfg); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for name, config := range cfg {
|
||||||
|
l.Name = name
|
||||||
|
l.Config = config
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
264
balancer/xds/xds_client.go
Normal file
264
balancer/xds/xds_client.go
Normal file
@ -0,0 +1,264 @@
|
|||||||
|
/*
|
||||||
|
*
|
||||||
|
* Copyright 2019 gRPC authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package xds
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||||||
|
xdscorepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
||||||
|
xdsdiscoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
|
||||||
|
"github.com/gogo/protobuf/proto"
|
||||||
|
"github.com/gogo/protobuf/types"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/balancer"
|
||||||
|
"google.golang.org/grpc/grpclog"
|
||||||
|
"google.golang.org/grpc/internal/backoff"
|
||||||
|
"google.golang.org/grpc/internal/channelz"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
grpcHostname = "com.googleapis.trafficdirector.grpc_hostname"
|
||||||
|
cdsType = "type.googleapis.com/envoy.api.v2.Cluster"
|
||||||
|
edsType = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
|
||||||
|
endpointRequired = "endpoints_required"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
defaultBackoffConfig = backoff.Exponential{
|
||||||
|
MaxDelay: 120 * time.Second,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
// client is responsible for connecting to the specified traffic director, passing the received
|
||||||
|
// ADS response from the traffic director, and sending notification when communication with the
|
||||||
|
// traffic director is lost.
|
||||||
|
type client struct {
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
cli xdsdiscoverypb.AggregatedDiscoveryServiceClient
|
||||||
|
opts balancer.BuildOptions
|
||||||
|
balancerName string // the traffic director name
|
||||||
|
serviceName string // the user dial target name
|
||||||
|
enableCDS bool
|
||||||
|
newADS func(ctx context.Context, resp proto.Message) error
|
||||||
|
loseContact func(ctx context.Context)
|
||||||
|
cleanup func()
|
||||||
|
backoff backoff.Strategy
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
cc *grpc.ClientConn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) run() {
|
||||||
|
c.dial()
|
||||||
|
c.makeADSCall()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) close() {
|
||||||
|
c.cancel()
|
||||||
|
c.mu.Lock()
|
||||||
|
if c.cc != nil {
|
||||||
|
c.cc.Close()
|
||||||
|
}
|
||||||
|
c.mu.Unlock()
|
||||||
|
c.cleanup()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) dial() {
|
||||||
|
var dopts []grpc.DialOption
|
||||||
|
if creds := c.opts.DialCreds; creds != nil {
|
||||||
|
if err := creds.OverrideServerName(c.balancerName); err == nil {
|
||||||
|
dopts = append(dopts, grpc.WithTransportCredentials(creds))
|
||||||
|
} else {
|
||||||
|
grpclog.Warningf("xds: failed to override the server name in the credentials: %v, using Insecure", err)
|
||||||
|
dopts = append(dopts, grpc.WithInsecure())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
dopts = append(dopts, grpc.WithInsecure())
|
||||||
|
}
|
||||||
|
if c.opts.Dialer != nil {
|
||||||
|
dopts = append(dopts, grpc.WithContextDialer(c.opts.Dialer))
|
||||||
|
}
|
||||||
|
// Explicitly set pickfirst as the balancer.
|
||||||
|
dopts = append(dopts, grpc.WithBalancerName(grpc.PickFirstBalancerName))
|
||||||
|
if channelz.IsOn() {
|
||||||
|
dopts = append(dopts, grpc.WithChannelzParentID(c.opts.ChannelzParentID))
|
||||||
|
}
|
||||||
|
|
||||||
|
cc, err := grpc.DialContext(c.ctx, c.balancerName, dopts...)
|
||||||
|
// Since this is a non-blocking dial, so if it fails, it due to some serious error (not network
|
||||||
|
// related) error.
|
||||||
|
if err != nil {
|
||||||
|
grpclog.Fatalf("xds: failed to dial: %v", err)
|
||||||
|
}
|
||||||
|
c.mu.Lock()
|
||||||
|
select {
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
cc.Close()
|
||||||
|
default:
|
||||||
|
// only assign c.cc when xds client has not been closed, to prevent ClientConn leak.
|
||||||
|
c.cc = cc
|
||||||
|
}
|
||||||
|
c.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) newCDSRequest() *xdspb.DiscoveryRequest {
|
||||||
|
cdsReq := &xdspb.DiscoveryRequest{
|
||||||
|
Node: &xdscorepb.Node{
|
||||||
|
Metadata: &types.Struct{
|
||||||
|
Fields: map[string]*types.Value{
|
||||||
|
grpcHostname: {
|
||||||
|
Kind: &types.Value_StringValue{StringValue: c.serviceName},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
TypeUrl: cdsType,
|
||||||
|
}
|
||||||
|
return cdsReq
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) newEDSRequest() *xdspb.DiscoveryRequest {
|
||||||
|
edsReq := &xdspb.DiscoveryRequest{
|
||||||
|
Node: &xdscorepb.Node{
|
||||||
|
Metadata: &types.Struct{
|
||||||
|
Fields: map[string]*types.Value{
|
||||||
|
endpointRequired: {
|
||||||
|
Kind: &types.Value_BoolValue{BoolValue: c.enableCDS},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ResourceNames: []string{c.serviceName},
|
||||||
|
TypeUrl: edsType,
|
||||||
|
}
|
||||||
|
return edsReq
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) makeADSCall() {
|
||||||
|
c.cli = xdsdiscoverypb.NewAggregatedDiscoveryServiceClient(c.cc)
|
||||||
|
retryCount := 0
|
||||||
|
var doRetry bool
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if doRetry {
|
||||||
|
backoffTimer := time.NewTimer(c.backoff.Backoff(retryCount))
|
||||||
|
select {
|
||||||
|
case <-backoffTimer.C:
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
backoffTimer.Stop()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
retryCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
firstRespReceived := c.adsCallAttempt()
|
||||||
|
if firstRespReceived {
|
||||||
|
retryCount = 0
|
||||||
|
doRetry = false
|
||||||
|
} else {
|
||||||
|
doRetry = true
|
||||||
|
}
|
||||||
|
c.loseContact(c.ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) adsCallAttempt() (firstRespReceived bool) {
|
||||||
|
firstRespReceived = false
|
||||||
|
ctx, cancel := context.WithCancel(c.ctx)
|
||||||
|
defer cancel()
|
||||||
|
st, err := c.cli.StreamAggregatedResources(ctx, grpc.WaitForReady(true))
|
||||||
|
if err != nil {
|
||||||
|
grpclog.Infof("xds: failed to initial ADS streaming RPC due to %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if c.enableCDS {
|
||||||
|
if err := st.Send(c.newCDSRequest()); err != nil {
|
||||||
|
// current stream is broken, start a new one.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := st.Send(c.newEDSRequest()); err != nil {
|
||||||
|
// current stream is broken, start a new one.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
expectCDS := c.enableCDS
|
||||||
|
for {
|
||||||
|
resp, err := st.Recv()
|
||||||
|
if err != nil {
|
||||||
|
// current stream is broken, start a new one.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
firstRespReceived = true
|
||||||
|
resources := resp.GetResources()
|
||||||
|
if len(resources) < 1 {
|
||||||
|
grpclog.Warning("xds: ADS response contains 0 resource info.")
|
||||||
|
// start a new call as server misbehaves by sending a ADS response with 0 resource info.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if resp.GetTypeUrl() == cdsType && !c.enableCDS {
|
||||||
|
grpclog.Warning("xds: received CDS response in custom plugin mode.")
|
||||||
|
// start a new call as we receive CDS response when in EDS-only mode.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var adsResp types.DynamicAny
|
||||||
|
if err := types.UnmarshalAny(&resources[0], &adsResp); err != nil {
|
||||||
|
grpclog.Warningf("xds: failed to unmarshal resources due to %v.", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
switch adsResp.Message.(type) {
|
||||||
|
case *xdspb.Cluster:
|
||||||
|
expectCDS = false
|
||||||
|
case *xdspb.ClusterLoadAssignment:
|
||||||
|
if expectCDS {
|
||||||
|
grpclog.Warningf("xds: expecting CDS response, got EDS response instead.")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := c.newADS(c.ctx, adsResp.Message); err != nil {
|
||||||
|
grpclog.Warningf("xds: processing new ADS message failed due to %v.", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func newXDSClient(balancerName string, serviceName string, enableCDS bool, opts balancer.BuildOptions, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client {
|
||||||
|
c := &client{
|
||||||
|
balancerName: balancerName,
|
||||||
|
serviceName: serviceName,
|
||||||
|
enableCDS: enableCDS,
|
||||||
|
opts: opts,
|
||||||
|
newADS: newADS,
|
||||||
|
loseContact: loseContact,
|
||||||
|
cleanup: exitCleanup,
|
||||||
|
backoff: defaultBackoffConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
c.ctx, c.cancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
return c
|
||||||
|
}
|
465
balancer/xds/xds_client_test.go
Normal file
465
balancer/xds/xds_client_test.go
Normal file
@ -0,0 +1,465 @@
|
|||||||
|
/*
|
||||||
|
*
|
||||||
|
* Copyright 2019 gRPC authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package xds
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||||||
|
xdscorepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
||||||
|
xdsendpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
|
||||||
|
xdsdiscoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
|
||||||
|
"github.com/gogo/protobuf/proto"
|
||||||
|
"github.com/gogo/protobuf/types"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/balancer"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
testServiceName = "test/foo"
|
||||||
|
testCDSReq = &xdspb.DiscoveryRequest{
|
||||||
|
Node: &xdscorepb.Node{
|
||||||
|
Metadata: &types.Struct{
|
||||||
|
Fields: map[string]*types.Value{
|
||||||
|
grpcHostname: {
|
||||||
|
Kind: &types.Value_StringValue{StringValue: testServiceName},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
TypeUrl: cdsType,
|
||||||
|
}
|
||||||
|
testEDSReq = &xdspb.DiscoveryRequest{
|
||||||
|
Node: &xdscorepb.Node{
|
||||||
|
Metadata: &types.Struct{
|
||||||
|
Fields: map[string]*types.Value{
|
||||||
|
endpointRequired: {
|
||||||
|
Kind: &types.Value_BoolValue{BoolValue: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ResourceNames: []string{testServiceName},
|
||||||
|
TypeUrl: edsType,
|
||||||
|
}
|
||||||
|
testEDSReqWithoutEndpoints = &xdspb.DiscoveryRequest{
|
||||||
|
Node: &xdscorepb.Node{
|
||||||
|
Metadata: &types.Struct{
|
||||||
|
Fields: map[string]*types.Value{
|
||||||
|
endpointRequired: {
|
||||||
|
Kind: &types.Value_BoolValue{BoolValue: false},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ResourceNames: []string{testServiceName},
|
||||||
|
TypeUrl: edsType,
|
||||||
|
}
|
||||||
|
testCluster = &xdspb.Cluster{
|
||||||
|
Name: testServiceName,
|
||||||
|
Type: xdspb.Cluster_EDS,
|
||||||
|
LbPolicy: xdspb.Cluster_ROUND_ROBIN,
|
||||||
|
}
|
||||||
|
marshaledCluster, _ = proto.Marshal(testCluster)
|
||||||
|
testCDSResp = &xdspb.DiscoveryResponse{
|
||||||
|
Resources: []types.Any{
|
||||||
|
{
|
||||||
|
TypeUrl: cdsType,
|
||||||
|
Value: marshaledCluster,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
TypeUrl: cdsType,
|
||||||
|
}
|
||||||
|
testClusterLoadAssignment = &xdspb.ClusterLoadAssignment{
|
||||||
|
ClusterName: testServiceName,
|
||||||
|
Endpoints: []xdsendpointpb.LocalityLbEndpoints{
|
||||||
|
{
|
||||||
|
Locality: &xdscorepb.Locality{
|
||||||
|
Region: "asia-east1",
|
||||||
|
Zone: "1",
|
||||||
|
SubZone: "sa",
|
||||||
|
},
|
||||||
|
LbEndpoints: []xdsendpointpb.LbEndpoint{
|
||||||
|
{
|
||||||
|
HostIdentifier: &xdsendpointpb.LbEndpoint_Endpoint{
|
||||||
|
Endpoint: &xdsendpointpb.Endpoint{
|
||||||
|
Address: &xdscorepb.Address{
|
||||||
|
Address: &xdscorepb.Address_SocketAddress{
|
||||||
|
SocketAddress: &xdscorepb.SocketAddress{
|
||||||
|
Address: "1.1.1.1",
|
||||||
|
PortSpecifier: &xdscorepb.SocketAddress_PortValue{
|
||||||
|
PortValue: 10001,
|
||||||
|
},
|
||||||
|
ResolverName: "dns",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
HealthCheckConfig: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Metadata: &xdscorepb.Metadata{
|
||||||
|
FilterMetadata: map[string]*types.Struct{
|
||||||
|
"xx.lb": {
|
||||||
|
Fields: map[string]*types.Value{
|
||||||
|
"endpoint_name": {
|
||||||
|
Kind: &types.Value_StringValue{
|
||||||
|
StringValue: "some.endpoint.name",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
LoadBalancingWeight: &types.UInt32Value{
|
||||||
|
Value: 1,
|
||||||
|
},
|
||||||
|
Priority: 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
marshaledClusterLoadAssignment, _ = proto.Marshal(testClusterLoadAssignment)
|
||||||
|
testEDSResp = &xdspb.DiscoveryResponse{
|
||||||
|
Resources: []types.Any{
|
||||||
|
{
|
||||||
|
TypeUrl: edsType,
|
||||||
|
Value: marshaledClusterLoadAssignment,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
TypeUrl: edsType,
|
||||||
|
}
|
||||||
|
testClusterLoadAssignmentWithoutEndpoints = &xdspb.ClusterLoadAssignment{
|
||||||
|
ClusterName: testServiceName,
|
||||||
|
Endpoints: []xdsendpointpb.LocalityLbEndpoints{
|
||||||
|
{
|
||||||
|
Locality: &xdscorepb.Locality{
|
||||||
|
SubZone: "sa",
|
||||||
|
},
|
||||||
|
LoadBalancingWeight: &types.UInt32Value{
|
||||||
|
Value: 128,
|
||||||
|
},
|
||||||
|
Priority: 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Policy: nil,
|
||||||
|
}
|
||||||
|
marshaledClusterLoadAssignmentWithoutEndpoints, _ = proto.Marshal(testClusterLoadAssignmentWithoutEndpoints)
|
||||||
|
testEDSRespWithoutEndpoints = &xdspb.DiscoveryResponse{
|
||||||
|
Resources: []types.Any{
|
||||||
|
{
|
||||||
|
TypeUrl: edsType,
|
||||||
|
Value: marshaledClusterLoadAssignmentWithoutEndpoints,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
TypeUrl: edsType,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
type testTrafficDirector struct {
|
||||||
|
reqChan chan *request
|
||||||
|
respChan chan *response
|
||||||
|
}
|
||||||
|
|
||||||
|
type request struct {
|
||||||
|
req *xdspb.DiscoveryRequest
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
type response struct {
|
||||||
|
resp *xdspb.DiscoveryResponse
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ttd *testTrafficDirector) StreamAggregatedResources(s xdsdiscoverypb.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
|
||||||
|
for {
|
||||||
|
req, err := s.Recv()
|
||||||
|
if err != nil {
|
||||||
|
ttd.reqChan <- &request{
|
||||||
|
req: nil,
|
||||||
|
err: err,
|
||||||
|
}
|
||||||
|
if err == io.EOF {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
ttd.reqChan <- &request{
|
||||||
|
req: req,
|
||||||
|
err: nil,
|
||||||
|
}
|
||||||
|
if req.TypeUrl == edsType {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case resp := <-ttd.respChan:
|
||||||
|
if resp.err != nil {
|
||||||
|
return resp.err
|
||||||
|
}
|
||||||
|
if err := s.Send(resp.resp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case <-s.Context().Done():
|
||||||
|
return s.Context().Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ttd *testTrafficDirector) IncrementalAggregatedResources(xdsdiscoverypb.AggregatedDiscoveryService_IncrementalAggregatedResourcesServer) error {
|
||||||
|
return status.Error(codes.Unimplemented, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ttd *testTrafficDirector) sendResp(resp *response) {
|
||||||
|
ttd.respChan <- resp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ttd *testTrafficDirector) getReq() *request {
|
||||||
|
return <-ttd.reqChan
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestTrafficDirector() *testTrafficDirector {
|
||||||
|
return &testTrafficDirector{
|
||||||
|
reqChan: make(chan *request, 10),
|
||||||
|
respChan: make(chan *response, 10),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type testConfig struct {
|
||||||
|
doCDS bool
|
||||||
|
expectedRequests []*xdspb.DiscoveryRequest
|
||||||
|
responsesToSend []*xdspb.DiscoveryResponse
|
||||||
|
expectedADSResponses []proto.Message
|
||||||
|
adsErr error
|
||||||
|
svrErr error
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupServer(t *testing.T) (addr string, td *testTrafficDirector, cleanup func()) {
|
||||||
|
lis, err := net.Listen("tcp", "localhost:0")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("listen failed due to: %v", err)
|
||||||
|
}
|
||||||
|
svr := grpc.NewServer()
|
||||||
|
td = newTestTrafficDirector()
|
||||||
|
xdsdiscoverypb.RegisterAggregatedDiscoveryServiceServer(svr, td)
|
||||||
|
go svr.Serve(lis)
|
||||||
|
return lis.Addr().String(), td, func() {
|
||||||
|
svr.Stop()
|
||||||
|
lis.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s) TestXdsClientResponseHandling(t *testing.T) {
|
||||||
|
for _, test := range []*testConfig{
|
||||||
|
{
|
||||||
|
doCDS: true,
|
||||||
|
expectedRequests: []*xdspb.DiscoveryRequest{testCDSReq, testEDSReq},
|
||||||
|
responsesToSend: []*xdspb.DiscoveryResponse{testCDSResp, testEDSResp},
|
||||||
|
expectedADSResponses: []proto.Message{testCluster, testClusterLoadAssignment},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
doCDS: false,
|
||||||
|
expectedRequests: []*xdspb.DiscoveryRequest{testEDSReqWithoutEndpoints},
|
||||||
|
responsesToSend: []*xdspb.DiscoveryResponse{testEDSRespWithoutEndpoints},
|
||||||
|
expectedADSResponses: []proto.Message{testClusterLoadAssignmentWithoutEndpoints},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
testXdsClientResponseHandling(t, test)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testXdsClientResponseHandling(t *testing.T, test *testConfig) {
|
||||||
|
addr, td, cleanup := setupServer(t)
|
||||||
|
defer cleanup()
|
||||||
|
adsChan := make(chan proto.Message, 10)
|
||||||
|
newADS := func(ctx context.Context, i proto.Message) error {
|
||||||
|
adsChan <- i
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, func(context.Context) {}, func() {})
|
||||||
|
defer client.close()
|
||||||
|
go client.run()
|
||||||
|
|
||||||
|
for _, expectedReq := range test.expectedRequests {
|
||||||
|
req := td.getReq()
|
||||||
|
if req.err != nil {
|
||||||
|
t.Fatalf("ads RPC failed with err: %v", req.err)
|
||||||
|
}
|
||||||
|
if !proto.Equal(req.req, expectedReq) {
|
||||||
|
t.Fatalf("got ADS request %T %v, expected: %T %v", req.req, req.req, expectedReq, expectedReq)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, resp := range test.responsesToSend {
|
||||||
|
td.sendResp(&response{resp: resp})
|
||||||
|
ads := <-adsChan
|
||||||
|
if !proto.Equal(ads, test.expectedADSResponses[i]) {
|
||||||
|
t.Fatalf("received unexpected ads response, got %v, want %v", ads, test.expectedADSResponses[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s) TestXdsClientLoseContact(t *testing.T) {
|
||||||
|
for _, test := range []*testConfig{
|
||||||
|
{
|
||||||
|
doCDS: true,
|
||||||
|
responsesToSend: []*xdspb.DiscoveryResponse{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
doCDS: false,
|
||||||
|
responsesToSend: []*xdspb.DiscoveryResponse{testEDSRespWithoutEndpoints},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
testXdsClientLoseContactRemoteClose(t, test)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range []*testConfig{
|
||||||
|
{
|
||||||
|
doCDS: false,
|
||||||
|
responsesToSend: []*xdspb.DiscoveryResponse{testCDSResp}, // CDS response when in custom mode.
|
||||||
|
},
|
||||||
|
{
|
||||||
|
doCDS: true,
|
||||||
|
responsesToSend: []*xdspb.DiscoveryResponse{{}}, // response with 0 resources is an error case.
|
||||||
|
},
|
||||||
|
{
|
||||||
|
doCDS: true,
|
||||||
|
responsesToSend: []*xdspb.DiscoveryResponse{testCDSResp},
|
||||||
|
adsErr: errors.New("some ads parsing error from xdsBalancer"),
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
testXdsClientLoseContactADSRelatedErrorOccur(t, test)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testXdsClientLoseContactRemoteClose(t *testing.T, test *testConfig) {
|
||||||
|
addr, td, cleanup := setupServer(t)
|
||||||
|
defer cleanup()
|
||||||
|
adsChan := make(chan proto.Message, 10)
|
||||||
|
newADS := func(ctx context.Context, i proto.Message) error {
|
||||||
|
adsChan <- i
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
contactChan := make(chan *loseContact, 10)
|
||||||
|
loseContactFunc := func(context.Context) {
|
||||||
|
contactChan <- &loseContact{}
|
||||||
|
}
|
||||||
|
client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {})
|
||||||
|
defer client.close()
|
||||||
|
go client.run()
|
||||||
|
|
||||||
|
// make sure server side get the request (i.e stream created successfully on client side)
|
||||||
|
td.getReq()
|
||||||
|
|
||||||
|
for _, resp := range test.responsesToSend {
|
||||||
|
td.sendResp(&response{resp: resp})
|
||||||
|
// make sure client side receives it
|
||||||
|
<-adsChan
|
||||||
|
}
|
||||||
|
cleanup()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-contactChan:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("time out when expecting lost contact signal")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testXdsClientLoseContactADSRelatedErrorOccur(t *testing.T, test *testConfig) {
|
||||||
|
addr, td, cleanup := setupServer(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
adsChan := make(chan proto.Message, 10)
|
||||||
|
newADS := func(ctx context.Context, i proto.Message) error {
|
||||||
|
adsChan <- i
|
||||||
|
return test.adsErr
|
||||||
|
}
|
||||||
|
contactChan := make(chan *loseContact, 10)
|
||||||
|
loseContactFunc := func(context.Context) {
|
||||||
|
contactChan <- &loseContact{}
|
||||||
|
}
|
||||||
|
client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {})
|
||||||
|
defer client.close()
|
||||||
|
go client.run()
|
||||||
|
|
||||||
|
// make sure server side get the request (i.e stream created successfully on client side)
|
||||||
|
td.getReq()
|
||||||
|
|
||||||
|
for _, resp := range test.responsesToSend {
|
||||||
|
td.sendResp(&response{resp: resp})
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-contactChan:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("time out when expecting lost contact signal")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s) TestXdsClientExponentialRetry(t *testing.T) {
|
||||||
|
cfg := &testConfig{
|
||||||
|
svrErr: status.Errorf(codes.Aborted, "abort the stream to trigger retry"),
|
||||||
|
}
|
||||||
|
addr, td, cleanup := setupServer(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
adsChan := make(chan proto.Message, 10)
|
||||||
|
newADS := func(ctx context.Context, i proto.Message) error {
|
||||||
|
adsChan <- i
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
contactChan := make(chan *loseContact, 10)
|
||||||
|
loseContactFunc := func(context.Context) {
|
||||||
|
contactChan <- &loseContact{}
|
||||||
|
}
|
||||||
|
client := newXDSClient(addr, testServiceName, cfg.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {})
|
||||||
|
defer client.close()
|
||||||
|
go client.run()
|
||||||
|
|
||||||
|
var secondRetry, thirdRetry time.Time
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
// make sure server side get the request (i.e stream created successfully on client side)
|
||||||
|
td.getReq()
|
||||||
|
td.sendResp(&response{err: cfg.svrErr})
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-contactChan:
|
||||||
|
if i == 1 {
|
||||||
|
secondRetry = time.Now()
|
||||||
|
}
|
||||||
|
if i == 2 {
|
||||||
|
thirdRetry = time.Now()
|
||||||
|
}
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("time out when expecting lost contact signal")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if thirdRetry.Sub(secondRetry) < 1*time.Second {
|
||||||
|
t.Fatalf("interval between second and third retry is %v, expected > 1s", thirdRetry.Sub(secondRetry))
|
||||||
|
}
|
||||||
|
}
|
706
balancer/xds/xds_test.go
Normal file
706
balancer/xds/xds_test.go
Normal file
@ -0,0 +1,706 @@
|
|||||||
|
/*
|
||||||
|
*
|
||||||
|
* Copyright 2019 gRPC authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package xds
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"reflect"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"google.golang.org/grpc/internal/grpctest"
|
||||||
|
"google.golang.org/grpc/internal/leakcheck"
|
||||||
|
|
||||||
|
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||||||
|
"google.golang.org/grpc/balancer"
|
||||||
|
"google.golang.org/grpc/balancer/xds/edsbalancer"
|
||||||
|
"google.golang.org/grpc/connectivity"
|
||||||
|
"google.golang.org/grpc/resolver"
|
||||||
|
)
|
||||||
|
|
||||||
|
var lbABuilder *balancerABuilder
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
lbABuilder = &balancerABuilder{}
|
||||||
|
balancer.Register(lbABuilder)
|
||||||
|
balancer.Register(&balancerBBuilder{})
|
||||||
|
}
|
||||||
|
|
||||||
|
type s struct{}
|
||||||
|
|
||||||
|
func (s) Teardown(t *testing.T) {
|
||||||
|
leakcheck.Check(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test(t *testing.T) {
|
||||||
|
grpctest.RunSubTests(t, s{})
|
||||||
|
}
|
||||||
|
|
||||||
|
type lbPolicy string
|
||||||
|
|
||||||
|
const (
|
||||||
|
fakeBalancerA lbPolicy = "fake_balancer_A"
|
||||||
|
fakeBalancerB lbPolicy = "fake_balancer_B"
|
||||||
|
fakeBalancerC lbPolicy = "fake_balancer_C"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
testBalancerNameFooBar = "foo.bar"
|
||||||
|
testBalancerConfigFooBar, _ = json.Marshal(&testBalancerConfig{
|
||||||
|
BalancerName: testBalancerNameFooBar,
|
||||||
|
ChildPolicy: []lbPolicy{fakeBalancerA},
|
||||||
|
FallbackPolicy: []lbPolicy{fakeBalancerA},
|
||||||
|
})
|
||||||
|
specialAddrForBalancerA = resolver.Address{Addr: "this.is.balancer.A"}
|
||||||
|
specialAddrForBalancerB = resolver.Address{Addr: "this.is.balancer.B"}
|
||||||
|
|
||||||
|
// mu protects the access of latestFakeEdsBalancer
|
||||||
|
mu sync.Mutex
|
||||||
|
latestFakeEdsBalancer *fakeEDSBalancer
|
||||||
|
)
|
||||||
|
|
||||||
|
type testBalancerConfig struct {
|
||||||
|
BalancerName string
|
||||||
|
ChildPolicy []lbPolicy
|
||||||
|
FallbackPolicy []lbPolicy
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *lbPolicy) UnmarshalJSON(b []byte) error {
|
||||||
|
// no need to implement, not used.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l lbPolicy) MarshalJSON() ([]byte, error) {
|
||||||
|
m := make(map[string]struct{})
|
||||||
|
m[string(l)] = struct{}{}
|
||||||
|
return json.Marshal(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
type balancerABuilder struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
lastBalancer *balancerA
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *balancerABuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
|
||||||
|
b.mu.Lock()
|
||||||
|
b.lastBalancer = &balancerA{cc: cc, subconnStateChange: make(chan *scStateChange, 10)}
|
||||||
|
b.mu.Unlock()
|
||||||
|
return b.lastBalancer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *balancerABuilder) Name() string {
|
||||||
|
return string(fakeBalancerA)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *balancerABuilder) getLastBalancer() *balancerA {
|
||||||
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
return b.lastBalancer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *balancerABuilder) clearLastBalancer() {
|
||||||
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
b.lastBalancer = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type balancerBBuilder struct{}
|
||||||
|
|
||||||
|
func (b *balancerBBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
|
||||||
|
return &balancerB{cc: cc}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*balancerBBuilder) Name() string {
|
||||||
|
return string(fakeBalancerB)
|
||||||
|
}
|
||||||
|
|
||||||
|
type balancerA struct {
|
||||||
|
cc balancer.ClientConn
|
||||||
|
subconnStateChange chan *scStateChange
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *balancerA) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
|
||||||
|
b.subconnStateChange <- &scStateChange{sc: sc, state: state}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *balancerA) HandleResolvedAddrs(addrs []resolver.Address, err error) {
|
||||||
|
_, _ = b.cc.NewSubConn(append(addrs, specialAddrForBalancerA), balancer.NewSubConnOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *balancerA) Close() {}
|
||||||
|
|
||||||
|
type balancerB struct {
|
||||||
|
cc balancer.ClientConn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (balancerB) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *balancerB) HandleResolvedAddrs(addrs []resolver.Address, err error) {
|
||||||
|
_, _ = b.cc.NewSubConn(append(addrs, specialAddrForBalancerB), balancer.NewSubConnOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (balancerB) Close() {}
|
||||||
|
|
||||||
|
func newTestClientConn() *testClientConn {
|
||||||
|
return &testClientConn{
|
||||||
|
newSubConns: make(chan []resolver.Address, 10),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type testClientConn struct {
|
||||||
|
newSubConns chan []resolver.Address
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
||||||
|
t.newSubConns <- addrs
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (testClientConn) RemoveSubConn(balancer.SubConn) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (testClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (testClientConn) ResolveNow(resolver.ResolveNowOption) {}
|
||||||
|
|
||||||
|
func (testClientConn) Target() string {
|
||||||
|
return testServiceName
|
||||||
|
}
|
||||||
|
|
||||||
|
type scStateChange struct {
|
||||||
|
sc balancer.SubConn
|
||||||
|
state connectivity.State
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeEDSBalancer struct {
|
||||||
|
cc balancer.ClientConn
|
||||||
|
edsChan chan *xdspb.ClusterLoadAssignment
|
||||||
|
childPolicy chan *loadBalancingConfig
|
||||||
|
fallbackPolicy chan *loadBalancingConfig
|
||||||
|
subconnStateChange chan *scStateChange
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
|
||||||
|
f.subconnStateChange <- &scStateChange{sc: sc, state: state}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeEDSBalancer) HandleResolvedAddrs([]resolver.Address, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeEDSBalancer) Close() {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
latestFakeEdsBalancer = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment) {
|
||||||
|
f.edsChan <- edsResp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) {
|
||||||
|
f.childPolicy <- &loadBalancingConfig{
|
||||||
|
Name: name,
|
||||||
|
Config: config,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newFakeEDSBalancer(cc balancer.ClientConn) interface{} {
|
||||||
|
lb := &fakeEDSBalancer{
|
||||||
|
cc: cc,
|
||||||
|
edsChan: make(chan *xdspb.ClusterLoadAssignment, 10),
|
||||||
|
childPolicy: make(chan *loadBalancingConfig, 10),
|
||||||
|
fallbackPolicy: make(chan *loadBalancingConfig, 10),
|
||||||
|
subconnStateChange: make(chan *scStateChange, 10),
|
||||||
|
}
|
||||||
|
mu.Lock()
|
||||||
|
latestFakeEdsBalancer = lb
|
||||||
|
mu.Unlock()
|
||||||
|
return lb
|
||||||
|
}
|
||||||
|
|
||||||
|
func getLatestEdsBalancer() *fakeEDSBalancer {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
return latestFakeEdsBalancer
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeSubConn struct{}
|
||||||
|
|
||||||
|
func (*fakeSubConn) UpdateAddresses([]resolver.Address) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*fakeSubConn) Connect() {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s) TestXdsBalanceHandleResolvedAddrs(t *testing.T) {
|
||||||
|
startupTimeout = 500 * time.Millisecond
|
||||||
|
defer func() { startupTimeout = defaultTimeout }()
|
||||||
|
|
||||||
|
builder := balancer.Get("xds")
|
||||||
|
cc := newTestClientConn()
|
||||||
|
lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("unable to type assert to *xdsBalancer")
|
||||||
|
}
|
||||||
|
defer lb.Close()
|
||||||
|
if err := lb.HandleBalancerConfig(json.RawMessage(testBalancerConfigFooBar)); err != nil {
|
||||||
|
t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(testBalancerConfigFooBar), err)
|
||||||
|
}
|
||||||
|
addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
lb.HandleResolvedAddrs(addrs, nil)
|
||||||
|
select {
|
||||||
|
case nsc := <-cc.newSubConns:
|
||||||
|
if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) {
|
||||||
|
t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA))
|
||||||
|
}
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatalf("timeout when geting new subconn result")
|
||||||
|
}
|
||||||
|
addrs = addrs[:2-i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) {
|
||||||
|
startupTimeout = 500 * time.Millisecond
|
||||||
|
newEDSBalancer = newFakeEDSBalancer
|
||||||
|
defer func() {
|
||||||
|
startupTimeout = defaultTimeout
|
||||||
|
newEDSBalancer = edsbalancer.NewXDSBalancer
|
||||||
|
}()
|
||||||
|
|
||||||
|
builder := balancer.Get("xds")
|
||||||
|
cc := newTestClientConn()
|
||||||
|
lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("unable to type assert to *xdsBalancer")
|
||||||
|
}
|
||||||
|
defer lb.Close()
|
||||||
|
if err := lb.HandleBalancerConfig(json.RawMessage(testBalancerConfigFooBar)); err != nil {
|
||||||
|
t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(testBalancerConfigFooBar), err)
|
||||||
|
}
|
||||||
|
addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
|
||||||
|
lb.HandleResolvedAddrs(addrs, nil)
|
||||||
|
|
||||||
|
// verify fallback takes over
|
||||||
|
select {
|
||||||
|
case nsc := <-cc.newSubConns:
|
||||||
|
if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) {
|
||||||
|
t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA))
|
||||||
|
}
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatalf("timeout when geting new subconn result")
|
||||||
|
}
|
||||||
|
|
||||||
|
var cleanups []func()
|
||||||
|
defer func() {
|
||||||
|
for _, cleanup := range cleanups {
|
||||||
|
cleanup()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
// In the first iteration, an eds balancer takes over fallback balancer
|
||||||
|
// In the second iteration, a new xds client takes over previous one.
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
addr, td, cleanup := setupServer(t)
|
||||||
|
cleanups = append(cleanups, cleanup)
|
||||||
|
workingBalancerConfig, _ := json.Marshal(&testBalancerConfig{
|
||||||
|
BalancerName: addr,
|
||||||
|
ChildPolicy: []lbPolicy{fakeBalancerA},
|
||||||
|
FallbackPolicy: []lbPolicy{fakeBalancerA},
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
|
||||||
|
t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
|
||||||
|
}
|
||||||
|
td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
|
||||||
|
|
||||||
|
var j int
|
||||||
|
for j = 0; j < 10; j++ {
|
||||||
|
if edsLB := getLatestEdsBalancer(); edsLB != nil { // edsLB won't change between the two iterations
|
||||||
|
select {
|
||||||
|
case gotEDS := <-edsLB.edsChan:
|
||||||
|
if !reflect.DeepEqual(gotEDS, testClusterLoadAssignmentWithoutEndpoints) {
|
||||||
|
t.Fatalf("edsBalancer got eds: %v, want %v", gotEDS, testClusterLoadAssignmentWithoutEndpoints)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("haven't got EDS update after 1s")
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
if j == 10 {
|
||||||
|
t.Fatal("edsBalancer instance has not been created or updated after 1s")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// switch child policy, lb stays the same
|
||||||
|
// cds->eds or eds -> cds, restart xdsClient, lb stays the same
|
||||||
|
func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) {
|
||||||
|
newEDSBalancer = newFakeEDSBalancer
|
||||||
|
defer func() {
|
||||||
|
newEDSBalancer = edsbalancer.NewXDSBalancer
|
||||||
|
}()
|
||||||
|
|
||||||
|
builder := balancer.Get("xds")
|
||||||
|
cc := newTestClientConn()
|
||||||
|
lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("unable to type assert to *xdsBalancer")
|
||||||
|
}
|
||||||
|
defer lb.Close()
|
||||||
|
|
||||||
|
var cleanups []func()
|
||||||
|
defer func() {
|
||||||
|
for _, cleanup := range cleanups {
|
||||||
|
cleanup()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
for _, test := range []struct {
|
||||||
|
cfg *testBalancerConfig
|
||||||
|
responseToSend *xdspb.DiscoveryResponse
|
||||||
|
expectedChildPolicy *loadBalancingConfig
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
cfg: &testBalancerConfig{
|
||||||
|
ChildPolicy: []lbPolicy{fakeBalancerA},
|
||||||
|
},
|
||||||
|
responseToSend: testEDSRespWithoutEndpoints,
|
||||||
|
expectedChildPolicy: &loadBalancingConfig{
|
||||||
|
Name: string(fakeBalancerA),
|
||||||
|
Config: json.RawMessage(`{}`),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
cfg: &testBalancerConfig{
|
||||||
|
ChildPolicy: []lbPolicy{fakeBalancerB},
|
||||||
|
},
|
||||||
|
expectedChildPolicy: &loadBalancingConfig{
|
||||||
|
Name: string(fakeBalancerB),
|
||||||
|
Config: json.RawMessage(`{}`),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
cfg: &testBalancerConfig{},
|
||||||
|
responseToSend: testCDSResp,
|
||||||
|
expectedChildPolicy: &loadBalancingConfig{
|
||||||
|
Name: "ROUND_ROBIN",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
addr, td, cleanup := setupServer(t)
|
||||||
|
cleanups = append(cleanups, cleanup)
|
||||||
|
test.cfg.BalancerName = addr
|
||||||
|
workingBalancerConfig, _ := json.Marshal(test.cfg)
|
||||||
|
|
||||||
|
if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
|
||||||
|
t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
|
||||||
|
}
|
||||||
|
if test.responseToSend != nil {
|
||||||
|
td.sendResp(&response{resp: test.responseToSend})
|
||||||
|
}
|
||||||
|
var i int
|
||||||
|
for i = 0; i < 10; i++ {
|
||||||
|
if edsLB := getLatestEdsBalancer(); edsLB != nil {
|
||||||
|
select {
|
||||||
|
case childPolicy := <-edsLB.childPolicy:
|
||||||
|
if !reflect.DeepEqual(childPolicy, test.expectedChildPolicy) {
|
||||||
|
t.Fatalf("got childPolicy %v, want %v", childPolicy, test.expectedChildPolicy)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("haven't got policy update after 1s")
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
if i == 10 {
|
||||||
|
t.Fatal("edsBalancer instance has not been created or updated after 1s")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// not in fallback mode, overwrite fallback info.
|
||||||
|
// in fallback mode, update config or switch balancer.
|
||||||
|
func (s) TestXdsBalanceHandleBalancerConfigFallbackUpdate(t *testing.T) {
|
||||||
|
newEDSBalancer = newFakeEDSBalancer
|
||||||
|
defer func() {
|
||||||
|
newEDSBalancer = edsbalancer.NewXDSBalancer
|
||||||
|
}()
|
||||||
|
builder := balancer.Get("xds")
|
||||||
|
cc := newTestClientConn()
|
||||||
|
lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("unable to type assert to *xdsBalancer")
|
||||||
|
}
|
||||||
|
defer lb.Close()
|
||||||
|
|
||||||
|
addr, td, cleanup := setupServer(t)
|
||||||
|
|
||||||
|
cfg := &testBalancerConfig{
|
||||||
|
BalancerName: addr,
|
||||||
|
ChildPolicy: []lbPolicy{fakeBalancerA},
|
||||||
|
FallbackPolicy: []lbPolicy{fakeBalancerA},
|
||||||
|
}
|
||||||
|
workingBalancerConfig, _ := json.Marshal(cfg)
|
||||||
|
|
||||||
|
if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
|
||||||
|
t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.FallbackPolicy = []lbPolicy{fakeBalancerB}
|
||||||
|
workingBalancerConfig, _ = json.Marshal(cfg)
|
||||||
|
|
||||||
|
if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
|
||||||
|
t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
|
||||||
|
|
||||||
|
var i int
|
||||||
|
for i = 0; i < 10; i++ {
|
||||||
|
if edsLB := getLatestEdsBalancer(); edsLB != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
if i == 10 {
|
||||||
|
t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s")
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup()
|
||||||
|
|
||||||
|
addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
|
||||||
|
lb.HandleResolvedAddrs(addrs, nil)
|
||||||
|
|
||||||
|
// verify fallback balancer B takes over
|
||||||
|
select {
|
||||||
|
case nsc := <-cc.newSubConns:
|
||||||
|
if !reflect.DeepEqual(append(addrs, specialAddrForBalancerB), nsc) {
|
||||||
|
t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerB))
|
||||||
|
}
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("timeout when geting new subconn result")
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.FallbackPolicy = []lbPolicy{fakeBalancerA}
|
||||||
|
workingBalancerConfig, _ = json.Marshal(cfg)
|
||||||
|
if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
|
||||||
|
t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify fallback balancer A takes over
|
||||||
|
select {
|
||||||
|
case nsc := <-cc.newSubConns:
|
||||||
|
if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) {
|
||||||
|
t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA))
|
||||||
|
}
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatalf("timeout when geting new subconn result")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) {
|
||||||
|
newEDSBalancer = newFakeEDSBalancer
|
||||||
|
defer func() {
|
||||||
|
newEDSBalancer = edsbalancer.NewXDSBalancer
|
||||||
|
}()
|
||||||
|
builder := balancer.Get("xds")
|
||||||
|
cc := newTestClientConn()
|
||||||
|
lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("unable to type assert to *xdsBalancer")
|
||||||
|
}
|
||||||
|
defer lb.Close()
|
||||||
|
|
||||||
|
addr, td, cleanup := setupServer(t)
|
||||||
|
defer cleanup()
|
||||||
|
cfg := &testBalancerConfig{
|
||||||
|
BalancerName: addr,
|
||||||
|
ChildPolicy: []lbPolicy{fakeBalancerA},
|
||||||
|
FallbackPolicy: []lbPolicy{fakeBalancerA},
|
||||||
|
}
|
||||||
|
workingBalancerConfig, _ := json.Marshal(cfg)
|
||||||
|
|
||||||
|
if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
|
||||||
|
t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
|
||||||
|
|
||||||
|
expectedScStateChange := &scStateChange{
|
||||||
|
sc: &fakeSubConn{},
|
||||||
|
state: connectivity.Ready,
|
||||||
|
}
|
||||||
|
|
||||||
|
var i int
|
||||||
|
for i = 0; i < 10; i++ {
|
||||||
|
if edsLB := getLatestEdsBalancer(); edsLB != nil {
|
||||||
|
lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state)
|
||||||
|
select {
|
||||||
|
case scsc := <-edsLB.subconnStateChange:
|
||||||
|
if !reflect.DeepEqual(scsc, expectedScStateChange) {
|
||||||
|
t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("haven't got subconn state change after 1s")
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
if i == 10 {
|
||||||
|
t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s")
|
||||||
|
}
|
||||||
|
|
||||||
|
// lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record
|
||||||
|
// to make sure there's a new one created and get the pointer to it.
|
||||||
|
lbABuilder.clearLastBalancer()
|
||||||
|
cleanup()
|
||||||
|
|
||||||
|
// switch to fallback
|
||||||
|
// fallback balancer A takes over
|
||||||
|
for i = 0; i < 10; i++ {
|
||||||
|
if fblb := lbABuilder.getLastBalancer(); fblb != nil {
|
||||||
|
lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state)
|
||||||
|
select {
|
||||||
|
case scsc := <-fblb.subconnStateChange:
|
||||||
|
if !reflect.DeepEqual(scsc, expectedScStateChange) {
|
||||||
|
t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("haven't got subconn state change after 1s")
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
if i == 10 {
|
||||||
|
t.Fatal("balancerA instance has not been created after 1s")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s) TestXdsBalancerFallbackSignalFromEdsBalancer(t *testing.T) {
|
||||||
|
newEDSBalancer = newFakeEDSBalancer
|
||||||
|
defer func() {
|
||||||
|
newEDSBalancer = edsbalancer.NewXDSBalancer
|
||||||
|
}()
|
||||||
|
builder := balancer.Get("xds")
|
||||||
|
cc := newTestClientConn()
|
||||||
|
lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("unable to type assert to *xdsBalancer")
|
||||||
|
}
|
||||||
|
defer lb.Close()
|
||||||
|
|
||||||
|
addr, td, cleanup := setupServer(t)
|
||||||
|
defer cleanup()
|
||||||
|
cfg := &testBalancerConfig{
|
||||||
|
BalancerName: addr,
|
||||||
|
ChildPolicy: []lbPolicy{fakeBalancerA},
|
||||||
|
FallbackPolicy: []lbPolicy{fakeBalancerA},
|
||||||
|
}
|
||||||
|
workingBalancerConfig, _ := json.Marshal(cfg)
|
||||||
|
|
||||||
|
if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
|
||||||
|
t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
|
||||||
|
|
||||||
|
expectedScStateChange := &scStateChange{
|
||||||
|
sc: &fakeSubConn{},
|
||||||
|
state: connectivity.Ready,
|
||||||
|
}
|
||||||
|
|
||||||
|
var i int
|
||||||
|
for i = 0; i < 10; i++ {
|
||||||
|
if edsLB := getLatestEdsBalancer(); edsLB != nil {
|
||||||
|
lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state)
|
||||||
|
select {
|
||||||
|
case scsc := <-edsLB.subconnStateChange:
|
||||||
|
if !reflect.DeepEqual(scsc, expectedScStateChange) {
|
||||||
|
t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("haven't got subconn state change after 1s")
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
if i == 10 {
|
||||||
|
t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s")
|
||||||
|
}
|
||||||
|
|
||||||
|
// lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record
|
||||||
|
// to make sure there's a new one created and get the pointer to it.
|
||||||
|
lbABuilder.clearLastBalancer()
|
||||||
|
cleanup()
|
||||||
|
|
||||||
|
// switch to fallback
|
||||||
|
// fallback balancer A takes over
|
||||||
|
for i = 0; i < 10; i++ {
|
||||||
|
if fblb := lbABuilder.getLastBalancer(); fblb != nil {
|
||||||
|
lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state)
|
||||||
|
select {
|
||||||
|
case scsc := <-fblb.subconnStateChange:
|
||||||
|
if !reflect.DeepEqual(scsc, expectedScStateChange) {
|
||||||
|
t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("haven't got subconn state change after 1s")
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
if i == 10 {
|
||||||
|
t.Fatal("balancerA instance has not been created after 1s")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s) TestXdsBalancerConfigParsingSelectingLBPolicy(t *testing.T) {
|
||||||
|
tesCfg := &testBalancerConfig{
|
||||||
|
BalancerName: "fake.foo.bar",
|
||||||
|
ChildPolicy: []lbPolicy{fakeBalancerC, fakeBalancerA, fakeBalancerB}, // selects fakeBalancerA
|
||||||
|
FallbackPolicy: []lbPolicy{fakeBalancerC, fakeBalancerB, fakeBalancerA}, // selects fakeBalancerB
|
||||||
|
}
|
||||||
|
js, _ := json.Marshal(tesCfg)
|
||||||
|
var xdsCfg xdsConfig
|
||||||
|
if err := json.Unmarshal(js, &xdsCfg); err != nil {
|
||||||
|
t.Fatal("unable to unmarshal balancer config into xds config")
|
||||||
|
}
|
||||||
|
wantChildPolicy := &loadBalancingConfig{Name: string(fakeBalancerA), Config: json.RawMessage(`{}`)}
|
||||||
|
if !reflect.DeepEqual(xdsCfg.ChildPolicy, wantChildPolicy) {
|
||||||
|
t.Fatalf("got child policy %v, want %v", xdsCfg.ChildPolicy, wantChildPolicy)
|
||||||
|
}
|
||||||
|
wantFallbackPolicy := &loadBalancingConfig{Name: string(fakeBalancerB), Config: json.RawMessage(`{}`)}
|
||||||
|
if !reflect.DeepEqual(xdsCfg.FallBackPolicy, wantFallbackPolicy) {
|
||||||
|
t.Fatalf("got fallback policy %v, want %v", xdsCfg.FallBackPolicy, wantFallbackPolicy)
|
||||||
|
}
|
||||||
|
}
|
4
go.mod
4
go.mod
@ -4,9 +4,13 @@ require (
|
|||||||
cloud.google.com/go v0.26.0 // indirect
|
cloud.google.com/go v0.26.0 // indirect
|
||||||
github.com/BurntSushi/toml v0.3.1 // indirect
|
github.com/BurntSushi/toml v0.3.1 // indirect
|
||||||
github.com/client9/misspell v0.3.4
|
github.com/client9/misspell v0.3.4
|
||||||
|
github.com/envoyproxy/go-control-plane v0.6.7
|
||||||
|
github.com/gogo/googleapis v1.1.0 // indirect
|
||||||
|
github.com/gogo/protobuf v1.2.0
|
||||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
|
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
|
||||||
github.com/golang/mock v1.1.1
|
github.com/golang/mock v1.1.1
|
||||||
github.com/golang/protobuf v1.2.0
|
github.com/golang/protobuf v1.2.0
|
||||||
|
github.com/lyft/protoc-gen-validate v0.0.13 // indirect
|
||||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3
|
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3
|
||||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d
|
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d
|
||||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be
|
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be
|
||||||
|
8
go.sum
8
go.sum
@ -4,12 +4,20 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
|
|||||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||||
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
|
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
|
||||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||||
|
github.com/envoyproxy/go-control-plane v0.6.7 h1:fyr1xdpt6v/HvZL70++avYT0HggJQD6/dHgNk0+uJjM=
|
||||||
|
github.com/envoyproxy/go-control-plane v0.6.7/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
|
||||||
|
github.com/gogo/googleapis v1.1.0 h1:kFkMAZBNAn4j7K0GiZr8cRYzejq68VbheufiV3YuyFI=
|
||||||
|
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
|
||||||
|
github.com/gogo/protobuf v1.2.0 h1:xU6/SpYbvkNYiptHJYEDRseDLvYE7wSqhYYNy0QSUzI=
|
||||||
|
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
|
||||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
|
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
|
||||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||||
github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8=
|
github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8=
|
||||||
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
|
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
|
||||||
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
|
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
|
||||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||||
|
github.com/lyft/protoc-gen-validate v0.0.13 h1:KNt/RhmQTOLr7Aj8PsJ7mTronaFyx80mRTT9qF261dA=
|
||||||
|
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
|
||||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3 h1:x/bBzNauLQAlE3fLku/xy92Y8QwKX5HZymrMz2IiKFc=
|
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3 h1:x/bBzNauLQAlE3fLku/xy92Y8QwKX5HZymrMz2IiKFc=
|
||||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d h1:g9qWBGx4puODJTMVyoPrpoxPFgVGd+z1DZwjfRu4d0I=
|
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d h1:g9qWBGx4puODJTMVyoPrpoxPFgVGd+z1DZwjfRu4d0I=
|
||||||
|
Reference in New Issue
Block a user