xds: use the shared xds client in eds balancer (#3197)

This PR removes the xds_client implementation from eds balancer, and replaces it with a xds_client wrapper. (The xds_client wrapper has very similar API as the old xds_client implementation, so the change in the eds balancer is minimal).

The eds balancer currently doesn't look for xds_client from attributes, and always creates a new xds_client. The attributes change will be done in a following up change.
This commit is contained in:
Menghan Li
2019-11-21 14:10:04 -08:00
committed by GitHub
parent 0e8c6a3281
commit da649b3624
14 changed files with 502 additions and 527 deletions

View File

@ -43,6 +43,9 @@ type XDSConfig struct {
// LRS server to send load reports to. If not present, load reporting
// will be disabled. If set to the empty string, load reporting will
// be sent to the same server that we obtained CDS data from.
//
// TODO: this should be a pointer to a string, so nil means load reporting
// is disabled.
LrsLoadReportingServerName string
}

View File

@ -26,6 +26,8 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
)
const testSubConnsCount = 16
@ -166,7 +168,7 @@ func (tls *testLoadStore) CallServerLoad(l internal.Locality, name string, d flo
tls.callsCost = append(tls.callsCost, testServerLoad{name: name, d: d})
}
func (*testLoadStore) ReportTo(ctx context.Context, cc *grpc.ClientConn) {
func (*testLoadStore) ReportTo(ctx context.Context, cc *grpc.ClientConn, clusterName string, node *corepb.Node) {
panic("not implemented")
}

View File

@ -43,7 +43,8 @@ type Store interface {
CallStarted(l internal.Locality)
CallFinished(l internal.Locality, err error)
CallServerLoad(l internal.Locality, name string, d float64)
ReportTo(ctx context.Context, cc *grpc.ClientConn)
// Report the load of clusterName to cc.
ReportTo(ctx context.Context, cc *grpc.ClientConn, clusterName string, node *corepb.Node)
}
type rpcCountData struct {
@ -140,7 +141,6 @@ func (rld *rpcLoadData) loadAndClear() (s float64, c uint64) {
// lrsStore collects loads from xds balancer, and periodically sends load to the
// server.
type lrsStore struct {
serviceName string
backoff backoff.Strategy
lastReported time.Time
@ -149,9 +149,8 @@ type lrsStore struct {
}
// NewStore creates a store for load reports.
func NewStore(serviceName string) Store {
func NewStore() Store {
return &lrsStore{
serviceName: serviceName,
backoff: backoff.DefaultExponential,
lastReported: time.Now(),
}
@ -284,7 +283,7 @@ func (ls *lrsStore) buildStats(clusterName string) []*endpointpb.ClusterStats {
// ReportTo makes a streaming lrs call to cc and blocks.
//
// It retries the call (with backoff) until ctx is canceled.
func (ls *lrsStore) ReportTo(ctx context.Context, cc *grpc.ClientConn) {
func (ls *lrsStore) ReportTo(ctx context.Context, cc *grpc.ClientConn, clusterName string, node *corepb.Node) {
c := lrsgrpc.NewLoadReportingServiceClient(cc)
var (
retryCount int
@ -311,46 +310,46 @@ func (ls *lrsStore) ReportTo(ctx context.Context, cc *grpc.ClientConn) {
doBackoff = true
stream, err := c.StreamLoadStats(ctx)
if err != nil {
grpclog.Infof("lrs: failed to create stream: %v", err)
grpclog.Warningf("lrs: failed to create stream: %v", err)
continue
}
if err := stream.Send(&lrspb.LoadStatsRequest{
// TODO: when moving this to the xds client, the Node
// field needs to be set to node from bootstrap file.
// Node: c.config.NodeProto,
ClusterStats: []*endpointpb.ClusterStats{{
// TODO: this is user's dial target now, as a temporary
// solution. Eventually this will be cluster name from CDS's response.
ClusterName: ls.serviceName,
ClusterName: clusterName,
}},
Node: node,
}); err != nil {
grpclog.Infof("lrs: failed to send first request: %v", err)
grpclog.Warningf("lrs: failed to send first request: %v", err)
continue
}
first, err := stream.Recv()
if err != nil {
grpclog.Infof("lrs: failed to receive first response: %v", err)
grpclog.Warningf("lrs: failed to receive first response: %v", err)
continue
}
interval, err := ptypes.Duration(first.LoadReportingInterval)
if err != nil {
grpclog.Infof("lrs: failed to convert report interval: %v", err)
grpclog.Warningf("lrs: failed to convert report interval: %v", err)
continue
}
if len(first.Clusters) != 1 {
grpclog.Infof("lrs: received multiple clusters %v, expect one cluster", first.Clusters)
grpclog.Warningf("lrs: received multiple clusters %v, expect one cluster", first.Clusters)
continue
}
if first.Clusters[0] != clusterName {
grpclog.Warningf("lrs: received cluster is unexpected. Got %v, want %v", first.Clusters[0], clusterName)
continue
}
if first.ReportEndpointGranularity {
// TODO: fixme to support per endpoint loads.
grpclog.Infof("lrs: endpoint loads requested, but not supported by current implementation")
grpclog.Warningf("lrs: endpoint loads requested, but not supported by current implementation")
continue
}
// No backoff afterwards.
doBackoff = false
retryCount = 0
ls.sendLoads(ctx, stream, first.Clusters[0], interval)
ls.sendLoads(ctx, stream, clusterName, interval)
}
}
@ -366,7 +365,7 @@ func (ls *lrsStore) sendLoads(ctx context.Context, stream lrsgrpc.LoadReportingS
if err := stream.Send(&lrspb.LoadStatsRequest{
ClusterStats: ls.buildStats(clusterName),
}); err != nil {
grpclog.Infof("lrs: failed to send report: %v", err)
grpclog.Warningf("lrs: failed to send report: %v", err)
return
}
}

View File

@ -128,7 +128,7 @@ func Test_lrsStore_buildStats_drops(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ls := NewStore(testService).(*lrsStore)
ls := NewStore().(*lrsStore)
for _, ds := range tt.drops {
var (
@ -255,7 +255,7 @@ func Test_lrsStore_buildStats_rpcCounts(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ls := NewStore(testService).(*lrsStore)
ls := NewStore().(*lrsStore)
// InProgress count doesn't get cleared at each buildStats, keep
// them to carry over.
@ -439,7 +439,7 @@ func Test_lrsStore_ReportTo(t *testing.T) {
})
defer cleanup()
ls := NewStore(testService)
ls := NewStore()
cc, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
t.Fatalf("failed to dial: %v", err)
@ -448,7 +448,7 @@ func Test_lrsStore_ReportTo(t *testing.T) {
defer cancel()
done := make(chan struct{})
go func() {
ls.ReportTo(ctx, cc)
ls.ReportTo(ctx, cc, testService, nil)
close(done)
}()

View File

@ -39,7 +39,7 @@ import (
const (
defaultTimeout = 10 * time.Second
xdsName = "xds_experimental"
edsName = "experimental_eds"
)
var (
@ -53,15 +53,15 @@ var (
)
func init() {
balancer.Register(&xdsBalancerBuilder{})
balancer.Register(&edsBalancerBuilder{})
}
type xdsBalancerBuilder struct{}
type edsBalancerBuilder struct{}
// Build helps implement the balancer.Builder interface.
func (b *xdsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
ctx, cancel := context.WithCancel(context.Background())
x := &xdsBalancer{
x := &edsBalancer{
ctx: ctx,
cancel: cancel,
buildOpts: opts,
@ -71,7 +71,7 @@ func (b *xdsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp
grpcUpdate: make(chan interface{}),
xdsClientUpdate: make(chan interface{}),
timer: createDrainedTimer(), // initialized a timer that won't fire without reset
loadStore: lrs.NewStore(opts.Target.Endpoint),
loadStore: lrs.NewStore(),
}
x.cc = &xdsClientConn{
updateState: x.connStateMgr.updateState,
@ -81,11 +81,11 @@ func (b *xdsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp
return x
}
func (b *xdsBalancerBuilder) Name() string {
return xdsName
func (b *edsBalancerBuilder) Name() string {
return edsName
}
func (b *xdsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
func (b *edsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
var cfg XDSConfig
if err := json.Unmarshal(c, &cfg); err != nil {
return nil, fmt.Errorf("unable to unmarshal balancer config %s into xds config", string(c))
@ -94,7 +94,7 @@ func (b *xdsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadB
}
// edsBalancerInterface defines the interface that edsBalancer must implement to
// communicate with xdsBalancer.
// communicate with edsBalancer.
//
// It's implemented by the real eds balancer and a fake testing eds balancer.
type edsBalancerInterface interface {
@ -108,11 +108,11 @@ type edsBalancerInterface interface {
Close()
}
var _ balancer.V2Balancer = (*xdsBalancer)(nil) // Assert that we implement V2Balancer
var _ balancer.V2Balancer = (*edsBalancer)(nil) // Assert that we implement V2Balancer
// xdsBalancer manages xdsClient and the actual balancer that does load balancing (either edsBalancer,
// edsBalancer manages xdsClient and the actual balancer that does load balancing (either edsBalancer,
// or fallback LB).
type xdsBalancer struct {
type edsBalancer struct {
cc balancer.ClientConn // *xdsClientConn
buildOpts balancer.BuildOptions
startupTimeout time.Duration
@ -120,25 +120,29 @@ type xdsBalancer struct {
connStateMgr *connStateMgr
ctx context.Context
cancel context.CancelFunc
startup bool // startup indicates whether this xdsBalancer is in startup stage.
startup bool // startup indicates whether this edsBalancer is in startup stage.
inFallbackMonitor bool
// xdsBalancer continuously monitor the channels below, and will handle events from them in sync.
// edsBalancer continuously monitor the channels below, and will handle events from them in sync.
grpcUpdate chan interface{}
xdsClientUpdate chan interface{}
timer *time.Timer
noSubConnAlert <-chan struct{}
client *client // may change when passed a different service config
config *XDSConfig // may change when passed a different service config
client *xdsclientWrapper // may change when passed a different service config
config *XDSConfig // may change when passed a different service config
xdsLB edsBalancerInterface
fallbackLB balancer.Balancer
fallbackInitData *resolver.State // may change when HandleResolved address is called
loadStore lrs.Store
}
func (x *xdsBalancer) startNewXDSClient(u *XDSConfig) {
// If the xdsBalancer is in startup stage, then we need to apply the startup timeout for the first
// TODO: cleanup this function, or just remove it. It was here because the xds
// server name from service config can change, and we need to migrate from the
// old one to the new one. Now the xds server name is specified by the bootstrap
// file, and should never change. There's no need for this.
func (x *edsBalancer) startNewXDSClient(u *XDSConfig) {
// If the edsBalancer is in startup stage, then we need to apply the startup timeout for the first
// xdsClient to get a response from the traffic director.
if x.startup {
x.startFallbackMonitoring()
@ -148,7 +152,7 @@ func (x *xdsBalancer) startNewXDSClient(u *XDSConfig) {
// connect to it. However, previous xds client should not be closed until the new one successfully
// connects to the traffic director (i.e. get an ADS response from the traffic director). Therefore,
// we let each new client to be responsible to close its immediate predecessor. In this way,
// xdsBalancer does not to implement complex synchronization to achieve the same purpose.
// edsBalancer does not to implement complex synchronization to achieve the same purpose.
prevClient := x.client
// haveGotADS is true means, this xdsClient has got ADS response from director in the past, which
// means it can close previous client if it hasn't and it now can send lose contact signal for
@ -189,14 +193,20 @@ func (x *xdsBalancer) startNewXDSClient(u *XDSConfig) {
prevClient.close()
}
}
x.client = newXDSClient(u.BalancerName, u.EDSServiceName, x.buildOpts, x.loadStore, newADS, loseContact, exitCleanup)
go x.client.run()
// TODO: handle cfg.LrsLoadReportingServerName and remove log.
if u.LrsLoadReportingServerName != "" {
grpclog.Warningf("xds: lrsLoadReportingServerName is not empty, but is not handled")
}
// TODO: stop using u.BalancerName. The value should come from bootstrap
// file. It's only used in tests now.
x.client = newXDSClientWrapper(u.BalancerName, u.EDSServiceName, x.buildOpts, u.LrsLoadReportingServerName, x.loadStore, newADS, loseContact, exitCleanup)
}
// run gets executed in a goroutine once xdsBalancer is created. It monitors updates from grpc,
// xdsClient and load balancer. It synchronizes the operations that happen inside xdsBalancer. It
// exits when xdsBalancer is closed.
func (x *xdsBalancer) run() {
// run gets executed in a goroutine once edsBalancer is created. It monitors updates from grpc,
// xdsClient and load balancer. It synchronizes the operations that happen inside edsBalancer. It
// exits when edsBalancer is closed.
func (x *edsBalancer) run() {
for {
select {
case update := <-x.grpcUpdate:
@ -222,7 +232,7 @@ func (x *xdsBalancer) run() {
}
}
func (x *xdsBalancer) handleGRPCUpdate(update interface{}) {
func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
switch u := update.(type) {
case *subConnStateUpdate:
if x.xdsLB != nil {
@ -245,11 +255,6 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) {
var fallbackChanged bool
// service config has been updated.
if !reflect.DeepEqual(cfg, x.config) {
// TODO: handle cfg.LrsLoadReportingServerName and remove log.
if cfg.LrsLoadReportingServerName != "" {
grpclog.Warningf("xds: lrsLoadReportingServerName is not empty, but is not handled")
}
if x.config == nil {
// The first time we get config, we just need to start the xdsClient.
x.startNewXDSClient(cfg)
@ -262,8 +267,12 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) {
return
}
// With a different BalancerName, we need to create a new xdsClient.
if cfg.BalancerName != x.config.BalancerName {
// Create a different xds_client if part of the config is different.
//
// TODO: this and all client related code, including comparing new
// config with old, creating new client, should be moved to a
// dedicated struct, and handled together.
if cfg.BalancerName != x.config.BalancerName || cfg.EDSServiceName != x.config.EDSServiceName || cfg.LrsLoadReportingServerName != x.config.LrsLoadReportingServerName {
x.startNewXDSClient(cfg)
}
// We will update the xdsLB with the new child policy, if we got a
@ -301,7 +310,7 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) {
}
}
func (x *xdsBalancer) handleXDSClientUpdate(update interface{}) {
func (x *edsBalancer) handleXDSClientUpdate(update interface{}) {
switch u := update.(type) {
case *edsResp:
select {
@ -378,15 +387,15 @@ type subConnStateUpdate struct {
state balancer.SubConnState
}
func (x *xdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
func (x *edsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
grpclog.Error("UpdateSubConnState should be called instead of HandleSubConnStateChange")
}
func (x *xdsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
func (x *edsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
grpclog.Error("UpdateResolverState should be called instead of HandleResolvedAddrs")
}
func (x *xdsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
func (x *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
update := &subConnStateUpdate{
sc: sc,
state: state,
@ -397,11 +406,11 @@ func (x *xdsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Sub
}
}
func (x *xdsBalancer) ResolverError(error) {
func (x *edsBalancer) ResolverError(error) {
// Ignore for now
}
func (x *xdsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
select {
case x.grpcUpdate <- &s:
case <-x.ctx.Done():
@ -414,7 +423,7 @@ type edsResp struct {
resp *xdsclient.EDSUpdate
}
func (x *xdsBalancer) newADSResponse(ctx context.Context, resp *xdsclient.EDSUpdate) error {
func (x *edsBalancer) newADSResponse(ctx context.Context, resp *xdsclient.EDSUpdate) error {
select {
case x.xdsClientUpdate <- &edsResp{ctx: ctx, resp: resp}:
case <-x.ctx.Done():
@ -428,7 +437,7 @@ type loseContact struct {
ctx context.Context
}
func (x *xdsBalancer) loseContact(ctx context.Context) {
func (x *edsBalancer) loseContact(ctx context.Context) {
select {
case x.xdsClientUpdate <- &loseContact{ctx: ctx}:
case <-x.ctx.Done():
@ -436,7 +445,7 @@ func (x *xdsBalancer) loseContact(ctx context.Context) {
}
}
func (x *xdsBalancer) switchFallback() {
func (x *edsBalancer) switchFallback() {
if x.xdsLB != nil {
x.xdsLB.Close()
x.xdsLB = nil
@ -446,7 +455,7 @@ func (x *xdsBalancer) switchFallback() {
x.cancelFallbackMonitoring()
}
func (x *xdsBalancer) updateFallbackWithResolverState(s *resolver.State) {
func (x *edsBalancer) updateFallbackWithResolverState(s *resolver.State) {
if lb, ok := x.fallbackLB.(balancer.V2Balancer); ok {
lb.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{
Addresses: s.Addresses,
@ -462,7 +471,7 @@ func (x *xdsBalancer) updateFallbackWithResolverState(s *resolver.State) {
// It will cancel fallback monitoring if we are in fallback monitoring stage.
// If there's no running edsBalancer currently, it will create one and initialize it. Also, it will
// shutdown the fallback balancer if there's one running.
func (x *xdsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() {
func (x *edsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() {
// xDS update will cancel fallback monitoring if we are in fallback monitoring stage.
x.cancelFallbackMonitoring()
@ -479,7 +488,7 @@ func (x *xdsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() {
}
}
func (x *xdsBalancer) buildFallBackBalancer(c *XDSConfig) {
func (x *edsBalancer) buildFallBackBalancer(c *XDSConfig) {
if c.FallBackPolicy == nil {
x.buildFallBackBalancer(&XDSConfig{
FallBackPolicy: &loadBalancingConfig{
@ -501,7 +510,7 @@ func (x *xdsBalancer) buildFallBackBalancer(c *XDSConfig) {
// timeout.
// 2. After xds client loses contact with the remote, fallback if all connections to the backends are
// lost (i.e. not in state READY).
func (x *xdsBalancer) startFallbackMonitoring() {
func (x *edsBalancer) startFallbackMonitoring() {
if x.startup {
x.startup = false
x.timer.Reset(x.startupTimeout)
@ -520,7 +529,7 @@ func (x *xdsBalancer) startFallbackMonitoring() {
// There are two cases where fallback monitoring should be canceled:
// 1. xDS client returns a new ADS message.
// 2. fallback has been triggered.
func (x *xdsBalancer) cancelFallbackMonitoring() {
func (x *edsBalancer) cancelFallbackMonitoring() {
if !x.timer.Stop() {
select {
case <-x.timer.C:
@ -534,7 +543,7 @@ func (x *xdsBalancer) cancelFallbackMonitoring() {
x.inFallbackMonitor = false
}
func (x *xdsBalancer) Close() {
func (x *edsBalancer) Close() {
x.cancel()
}

View File

@ -1,260 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package balancer
import (
"context"
"net"
"sync"
"time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
xdsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/xds/internal/balancer/lrs"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
)
const (
edsType = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
)
// client is responsible for connecting to the specified traffic director, passing the received
// ADS response from the traffic director, and sending notification when communication with the
// traffic director is lost.
type client struct {
ctx context.Context
cancel context.CancelFunc
serviceName string
cli xdsgrpc.AggregatedDiscoveryServiceClient
dialer func(context.Context, string) (net.Conn, error)
channelzParentID int64
newADS func(ctx context.Context, resp *xdsclient.EDSUpdate) error
loseContact func(ctx context.Context)
cleanup func()
backoff backoff.Strategy
loadStore lrs.Store
loadReportOnce sync.Once
config *bootstrap.Config
mu sync.Mutex
cc *grpc.ClientConn
}
func (c *client) run() {
c.dial()
c.makeADSCall()
}
func (c *client) close() {
c.cancel()
c.mu.Lock()
if c.cc != nil {
c.cc.Close()
}
c.mu.Unlock()
c.cleanup()
}
func (c *client) dial() {
dopts := []grpc.DialOption{c.config.Creds}
if c.dialer != nil {
dopts = append(dopts, grpc.WithContextDialer(c.dialer))
}
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, grpc.WithBalancerName(grpc.PickFirstBalancerName))
if channelz.IsOn() {
dopts = append(dopts, grpc.WithChannelzParentID(c.channelzParentID))
}
cc, err := grpc.DialContext(c.ctx, c.config.BalancerName, dopts...)
if err != nil {
// This could fail due to ctx error, which means this client was closed.
grpclog.Warningf("xds: failed to dial: %v", err)
return
}
c.mu.Lock()
select {
case <-c.ctx.Done():
cc.Close()
default:
// only assign c.cc when xds client has not been closed, to prevent ClientConn leak.
c.cc = cc
}
c.mu.Unlock()
}
func (c *client) newEDSRequest() *xdspb.DiscoveryRequest {
edsReq := &xdspb.DiscoveryRequest{
Node: c.config.NodeProto,
// TODO: the expected ResourceName could be in a different format from
// dial target. (test_service.test_namespace.traffic_director.com vs
// test_namespace:test_service).
//
// The solution today is to always set dial target in resource_names.
//
// A future solution could be: always do CDS, get cluster name from CDS
// response, and use it here.
// `ResourceNames: []string{c.clusterName},`
TypeUrl: edsType,
ResourceNames: []string{c.serviceName},
}
return edsReq
}
func (c *client) makeADSCall() {
c.cli = xdsgrpc.NewAggregatedDiscoveryServiceClient(c.cc)
retryCount := 0
var doRetry bool
for {
select {
case <-c.ctx.Done():
return
default:
}
if doRetry {
backoffTimer := time.NewTimer(c.backoff.Backoff(retryCount))
select {
case <-backoffTimer.C:
case <-c.ctx.Done():
backoffTimer.Stop()
return
}
retryCount++
}
firstRespReceived := c.adsCallAttempt()
if firstRespReceived {
retryCount = 0
doRetry = false
} else {
doRetry = true
}
c.loseContact(c.ctx)
}
}
func (c *client) adsCallAttempt() (firstRespReceived bool) {
firstRespReceived = false
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
st, err := c.cli.StreamAggregatedResources(ctx, grpc.WaitForReady(true))
if err != nil {
grpclog.Infof("xds: failed to initial ADS streaming RPC due to %v", err)
return
}
if err := st.Send(c.newEDSRequest()); err != nil {
// current stream is broken, start a new one.
grpclog.Infof("xds: ads RPC failed due to err: %v, when sending the EDS request", err)
return
}
for {
resp, err := st.Recv()
if err != nil {
// current stream is broken, start a new one.
grpclog.Infof("xds: ads RPC failed due to err: %v, when receiving the response", err)
return
}
firstRespReceived = true
resources := resp.GetResources()
if len(resources) < 1 {
grpclog.Warning("xds: ADS response contains 0 resource info.")
// start a new call as server misbehaves by sending a ADS response with 0 resource info.
return
}
if resp.GetTypeUrl() != edsType {
grpclog.Warningf("xds: received non-EDS response: %v", resp.GetTypeUrl())
return
}
var adsResp ptypes.DynamicAny
if err := ptypes.UnmarshalAny(resources[0], &adsResp); err != nil {
grpclog.Warningf("xds: failed to unmarshal resources due to %v.", err)
return
}
parsed, err := xdsclient.ParseEDSRespProto(adsResp.Message.(*xdspb.ClusterLoadAssignment))
if err != nil {
grpclog.Warningf("xds: parsing new EDS message failed due to %v.", err)
return
}
if err := c.newADS(c.ctx, parsed); err != nil {
grpclog.Warningf("xds: processing new ADS message failed due to %v.", err)
return
}
// Only start load reporting after ADS resp is received.
//
// Also, newADS() will close the previous load reporting stream, so we
// don't have double reporting.
c.loadReportOnce.Do(func() {
if c.loadStore != nil {
go c.loadStore.ReportTo(c.ctx, c.cc)
}
})
}
}
func newXDSClient(balancerName string, edsServiceName string, opts balancer.BuildOptions, loadStore lrs.Store, newADS func(context.Context, *xdsclient.EDSUpdate) error, loseContact func(ctx context.Context), exitCleanup func()) *client {
c := &client{
serviceName: edsServiceName,
dialer: opts.Dialer,
channelzParentID: opts.ChannelzParentID,
newADS: newADS,
loseContact: loseContact,
cleanup: exitCleanup,
backoff: backoff.DefaultExponential,
loadStore: loadStore,
}
if c.serviceName == "" {
c.serviceName = opts.Target.Endpoint
}
c.ctx, c.cancel = context.WithCancel(context.Background())
// It is possible that NewConfig returns a Config object with certain
// fields left unspecified. If so, we need to use some sane defaults here.
c.config = bootstrap.NewConfig()
if c.config.BalancerName == "" {
c.config.BalancerName = balancerName
}
if c.config.Creds == nil {
c.config.Creds = credsFromDefaults(balancerName, &opts)
}
return c
}
func credsFromDefaults(balancerName string, opts *balancer.BuildOptions) grpc.DialOption {
if opts.DialCreds == nil {
grpclog.Warning("xds: no credentials available, using Insecure")
return grpc.WithInsecure()
}
if err := opts.DialCreds.OverrideServerName(balancerName); err != nil {
grpclog.Warningf("xds: failed to override the server name in credentials: %v, using Insecure", err)
return grpc.WithInsecure()
}
return grpc.WithTransportCredentials(opts.DialCreds)
}

View File

@ -23,7 +23,6 @@ import (
"io"
"net"
"testing"
"time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
@ -44,15 +43,15 @@ import (
xdsclient "google.golang.org/grpc/xds/internal/client"
)
const (
edsType = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
)
var (
testServiceName = "test/foo"
testEDSServiceName = "test/service/eds"
testEDSReq = &xdspb.DiscoveryRequest{
TypeUrl: edsType,
ResourceNames: []string{testServiceName},
}
testServiceName = "test/foo"
testEDSClusterName = "test/service/eds"
testClusterLoadAssignment = &xdspb.ClusterLoadAssignment{
ClusterName: testServiceName,
ClusterName: testEDSClusterName,
Endpoints: []*endpointpb.LocalityLbEndpoints{{
Locality: &corepb.Locality{
Region: "asia-east1",
@ -184,8 +183,6 @@ type testConfig struct {
expectedRequests []*xdspb.DiscoveryRequest
responsesToSend []*xdspb.DiscoveryResponse
expectedADSResponses []proto.Message
adsErr error
svrErr error
}
func setupServer(t *testing.T) (addr string, td *testTrafficDirector, lrss *lrsServer, cleanup func()) {
@ -214,15 +211,19 @@ func setupServer(t *testing.T) (addr string, td *testTrafficDirector, lrss *lrsS
func (s) TestXdsClientResponseHandling(t *testing.T) {
for _, test := range []*testConfig{
{
expectedRequests: []*xdspb.DiscoveryRequest{testEDSReq},
responsesToSend: []*xdspb.DiscoveryResponse{testEDSResp},
expectedADSResponses: []proto.Message{testClusterLoadAssignment},
},
{
edsServiceName: testEDSServiceName,
// Test that if clusterName is not set, dialing target is used.
expectedRequests: []*xdspb.DiscoveryRequest{{
TypeUrl: edsType,
ResourceNames: []string{testEDSServiceName},
ResourceNames: []string{testServiceName}, // ResourceName is dialing target.
Node: &corepb.Node{},
}},
},
{
edsServiceName: testEDSClusterName,
expectedRequests: []*xdspb.DiscoveryRequest{{
TypeUrl: edsType,
ResourceNames: []string{testEDSClusterName},
Node: &corepb.Node{},
}},
responsesToSend: []*xdspb.DiscoveryResponse{testEDSResp},
expectedADSResponses: []proto.Message{testClusterLoadAssignment},
@ -240,9 +241,8 @@ func testXdsClientResponseHandling(t *testing.T, test *testConfig) {
adsChan <- i
return nil
}
client := newXDSClient(addr, test.edsServiceName, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, func(context.Context) {}, func() {})
client := newXDSClientWrapper(addr, test.edsServiceName, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, "", nil, newADS, func(context.Context) {}, func() {})
defer client.close()
go client.run()
for _, expectedReq := range test.expectedRequests {
req := td.getReq()
@ -250,7 +250,7 @@ func testXdsClientResponseHandling(t *testing.T, test *testConfig) {
t.Fatalf("ads RPC failed with err: %v", req.err)
}
if !proto.Equal(req.req, expectedReq) {
t.Fatalf("got ADS request %T %v, expected: %T %v", req.req, req.req, expectedReq, expectedReq)
t.Fatalf("got ADS request %T, expected: %T, diff: %s", req.req, expectedReq, cmp.Diff(req.req, expectedReq, cmp.Comparer(proto.Equal)))
}
}
@ -266,132 +266,3 @@ func testXdsClientResponseHandling(t *testing.T, test *testConfig) {
}
}
}
func (s) TestXdsClientLoseContact(t *testing.T) {
for _, test := range []*testConfig{{
responsesToSend: []*xdspb.DiscoveryResponse{testEDSResp},
}} {
testXdsClientLoseContactRemoteClose(t, test)
}
for _, test := range []*testConfig{{
responsesToSend: []*xdspb.DiscoveryResponse{{
Resources: []*anypb.Any{
{
TypeUrl: "not-eds",
Value: marshaledClusterLoadAssignment,
},
},
TypeUrl: "not-eds",
}},
}} {
testXdsClientLoseContactADSRelatedErrorOccur(t, test)
}
}
func testXdsClientLoseContactRemoteClose(t *testing.T, test *testConfig) {
addr, td, _, cleanup := setupServer(t)
defer cleanup()
adsChan := make(chan *xdsclient.EDSUpdate, 10)
newADS := func(ctx context.Context, i *xdsclient.EDSUpdate) error {
adsChan <- i
return nil
}
contactChan := make(chan *loseContact, 10)
loseContactFunc := func(context.Context) {
contactChan <- &loseContact{}
}
client := newXDSClient(addr, test.edsServiceName, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, loseContactFunc, func() {})
defer client.close()
go client.run()
// make sure server side get the request (i.e stream created successfully on client side)
td.getReq()
for _, resp := range test.responsesToSend {
td.sendResp(&response{resp: resp})
// make sure client side receives it
<-adsChan
}
cleanup()
select {
case <-contactChan:
case <-time.After(2 * time.Second):
t.Fatal("time out when expecting lost contact signal")
}
}
func testXdsClientLoseContactADSRelatedErrorOccur(t *testing.T, test *testConfig) {
addr, td, _, cleanup := setupServer(t)
defer cleanup()
adsChan := make(chan *xdsclient.EDSUpdate, 10)
newADS := func(ctx context.Context, i *xdsclient.EDSUpdate) error {
adsChan <- i
return test.adsErr
}
contactChan := make(chan *loseContact, 10)
loseContactFunc := func(context.Context) {
contactChan <- &loseContact{}
}
client := newXDSClient(addr, test.edsServiceName, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, loseContactFunc, func() {})
defer client.close()
go client.run()
// make sure server side get the request (i.e stream created successfully on client side)
td.getReq()
for _, resp := range test.responsesToSend {
td.sendResp(&response{resp: resp})
}
select {
case <-contactChan:
case <-time.After(2 * time.Second):
t.Fatal("time out when expecting lost contact signal")
}
}
func (s) TestXdsClientExponentialRetry(t *testing.T) {
cfg := &testConfig{
svrErr: status.Errorf(codes.Aborted, "abort the stream to trigger retry"),
}
addr, td, _, cleanup := setupServer(t)
defer cleanup()
adsChan := make(chan *xdsclient.EDSUpdate, 10)
newADS := func(ctx context.Context, i *xdsclient.EDSUpdate) error {
adsChan <- i
return nil
}
contactChan := make(chan *loseContact, 10)
loseContactFunc := func(context.Context) {
contactChan <- &loseContact{}
}
client := newXDSClient(addr, "", balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, loseContactFunc, func() {})
defer client.close()
go client.run()
var secondRetry, thirdRetry time.Time
for i := 0; i < 3; i++ {
// make sure server side get the request (i.e stream created successfully on client side)
td.getReq()
td.sendResp(&response{err: cfg.svrErr})
select {
case <-contactChan:
if i == 1 {
secondRetry = time.Now()
}
if i == 2 {
thirdRetry = time.Now()
}
case <-time.After(2 * time.Second):
t.Fatal("time out when expecting lost contact signal")
}
}
if thirdRetry.Sub(secondRetry) < 1*time.Second {
t.Fatalf("interval between second and third retry is %v, expected > 1s", thirdRetry.Sub(secondRetry))
}
}

View File

@ -0,0 +1,151 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package balancer
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/xds/internal/balancer/lrs"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
)
// xdsClientInterface contains only the xds_client methods needed by EDS
// balancer. It's defined so we can override xdsclientNew function in tests.
type xdsClientInterface interface {
WatchEDS(clusterName string, edsCb func(*xdsclient.EDSUpdate, error)) (cancel func())
ReportLoad(server string, clusterName string, loadStore lrs.Store) (cancel func())
Close()
}
var (
xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) {
return xdsclient.New(opts)
}
bootstrapConfigNew = bootstrap.NewConfig
)
// xdsclientWpapper is responsible for getting the xds client from attributes or
// creating a new xds client, and start watching EDS. The given callbacks will
// be called with EDS updates or errors.
type xdsclientWrapper struct {
ctx context.Context
cancel context.CancelFunc
cancelWatch func()
cancelLoadReport func()
cleanup func()
xdsclient xdsClientInterface
}
func (c *xdsclientWrapper) close() {
if c.cancelLoadReport != nil {
c.cancelLoadReport()
}
if c.cancelWatch != nil {
c.cancelWatch()
}
if c.xdsclient != nil {
// TODO: this shouldn't close xdsclient if it's from attributes.
c.xdsclient.Close()
}
c.cleanup()
}
func newXDSClientWrapper(balancerName string, edsServiceName string, bbo balancer.BuildOptions, loadReportServer string, loadStore lrs.Store, newADS func(context.Context, *xdsclient.EDSUpdate) error, loseContact func(ctx context.Context), exitCleanup func()) *xdsclientWrapper {
ctx, cancel := context.WithCancel(context.Background())
ret := &xdsclientWrapper{
ctx: ctx,
cancel: cancel,
cleanup: exitCleanup,
}
// TODO: get xdsclient from Attributes instead of creating a new one.
config := bootstrapConfigNew()
if config.BalancerName == "" {
config.BalancerName = balancerName
}
if config.Creds == nil {
// TODO: Once we start supporting a mechanism to register credential
// types, a failure to find the credential type mentioned in the
// bootstrap file should result in a failure, and not in using
// credentials from the parent channel (passed through the
// resolver.BuildOptions).
config.Creds = defaultDialCreds(config.BalancerName, bbo)
}
var dopts []grpc.DialOption
if bbo.Dialer != nil {
dopts = []grpc.DialOption{grpc.WithContextDialer(bbo.Dialer)}
}
c, err := xdsclientNew(xdsclient.Options{Config: *config, DialOpts: dopts})
if err != nil {
grpclog.Warningf("failed to create xdsclient, error: %v", err)
return ret
}
ret.xdsclient = c
// The clusterName to watch should come from CDS response, via service
// config. If it's an empty string, fallback user's dial target.
nameToWatch := edsServiceName
if nameToWatch == "" {
grpclog.Warningf("eds: cluster name to watch is an empty string. Fallback to user's dial target")
nameToWatch = bbo.Target.Endpoint
}
ret.cancelWatch = ret.xdsclient.WatchEDS(nameToWatch, func(update *xdsclient.EDSUpdate, err error) {
if err != nil {
loseContact(ret.ctx)
return
}
if err := newADS(ret.ctx, update); err != nil {
grpclog.Warningf("xds: processing new EDS update failed due to %v.", err)
}
})
if loadStore != nil {
ret.cancelLoadReport = ret.xdsclient.ReportLoad(loadReportServer, nameToWatch, loadStore)
}
return ret
}
// defaultDialCreds builds a DialOption containing the credentials to be used
// while talking to the xDS server (this is done only if the xds bootstrap
// process does not return any credentials to use). If the parent channel
// contains DialCreds, we use it as is. If it contains a CredsBundle, we use
// just the transport credentials from the bundle. If we don't find any
// credentials on the parent channel, we resort to using an insecure channel.
func defaultDialCreds(balancerName string, rbo balancer.BuildOptions) grpc.DialOption {
switch {
case rbo.DialCreds != nil:
if err := rbo.DialCreds.OverrideServerName(balancerName); err != nil {
grpclog.Warningf("xds: failed to override server name in credentials: %v, using Insecure", err)
return grpc.WithInsecure()
}
return grpc.WithTransportCredentials(rbo.DialCreds)
case rbo.CredsBundle != nil:
return grpc.WithTransportCredentials(rbo.CredsBundle.TransportCredentials())
default:
grpclog.Warning("xds: no credentials available, using Insecure")
return grpc.WithInsecure()
}
}

View File

@ -24,6 +24,7 @@ import (
"testing"
"time"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
@ -50,13 +51,14 @@ func (lrss *lrsServer) StreamLoadStats(stream lrsgrpc.LoadReportingService_Strea
}
if !proto.Equal(req, &lrspb.LoadStatsRequest{
ClusterStats: []*endpointpb.ClusterStats{{
ClusterName: testServiceName,
ClusterName: testEDSClusterName,
}},
Node: &corepb.Node{},
}) {
return status.Errorf(codes.FailedPrecondition, "unexpected req: %+v", req)
}
if err := stream.Send(&lrspb.LoadStatsResponse{
Clusters: []string{testServiceName},
Clusters: []string{testEDSClusterName},
LoadReportingInterval: lrss.reportingInterval,
}); err != nil {
return err
@ -87,11 +89,11 @@ func (s) TestXdsLoadReporting(t *testing.T) {
newEDSBalancer = originalNewEDSBalancer
}()
builder := balancer.Get(xdsName)
builder := balancer.Get(edsName)
cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer)
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
if !ok {
t.Fatalf("unable to type assert to *xdsBalancer")
t.Fatalf("unable to type assert to *edsBalancer")
}
defer lb.Close()

View File

@ -0,0 +1,46 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package balancer
import "google.golang.org/grpc/balancer"
// The old xds balancer implements logic for both CDS and EDS. With the new
// design, CDS is split and moved to a separate balancer, and the xds balancer
// becomes the EDS balancer.
//
// To keep the existing tests working, this file regisger EDS balancer under the
// old xds balancer name.
//
// TODO: delete this file when migration to new workflow (LDS, RDS, CDS, EDS) is
// done.
const xdsName = "xds_experimental"
func init() {
balancer.Register(&xdsBalancerBuilder{})
}
// xdsBalancerBuilder register edsBalancerBuilder (now with name
// "experimental_eds") under the old name "xds_experimental".
type xdsBalancerBuilder struct {
edsBalancerBuilder
}
func (b *xdsBalancerBuilder) Name() string {
return xdsName
}

View File

@ -21,15 +21,18 @@ package balancer
import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"sync"
"testing"
"time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/golang/protobuf/jsonpb"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpctest"
@ -39,14 +42,23 @@ import (
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/lrs"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
)
var lbABuilder = &balancerABuilder{}
func init() {
balancer.Register(&xdsBalancerBuilder{})
balancer.Register(&edsBalancerBuilder{})
balancer.Register(lbABuilder)
balancer.Register(&balancerBBuilder{})
bootstrapConfigNew = func() *bootstrap.Config {
return &bootstrap.Config{
BalancerName: "",
Creds: grpc.WithInsecure(),
NodeProto: &corepb.Node{},
}
}
}
type s struct{}
@ -70,6 +82,7 @@ var (
BalancerName: testBalancerNameFooBar,
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerB},
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
EDSServiceName: testEDSClusterName,
}
specialAddrForBalancerA = resolver.Address{Addr: "this.is.balancer.A"}
@ -233,6 +246,40 @@ type fakeSubConn struct{}
func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") }
func (*fakeSubConn) Connect() { panic("implement me") }
type fakeXDSClient struct {
edsCbReceived chan struct{} // Will be closed when WatchEDS is called.
edsCb func(*xdsclient.EDSUpdate, error)
}
func newFakeXDSClient() *fakeXDSClient {
return &fakeXDSClient{edsCbReceived: make(chan struct{})}
}
func (c *fakeXDSClient) WatchEDS(clusterName string, edsCb func(*xdsclient.EDSUpdate, error)) (cancel func()) {
c.edsCb = edsCb
// WatchEDS is expected to be only called once in the test. If a test needs
// to call it multiple times, this will panic.
close(c.edsCbReceived)
return func() {}
}
func (c *fakeXDSClient) callEDSCallback(u *xdsclient.EDSUpdate, err error) {
t := time.NewTimer(time.Second)
select {
case <-c.edsCbReceived:
t.Stop()
case <-t.C:
panic("EDS callback is not received after 1 second")
}
c.edsCb(u, err)
}
func (c *fakeXDSClient) ReportLoad(server string, clusterName string, loadStore lrs.Store) (cancel func()) {
return func() {}
}
func (c *fakeXDSClient) Close() {}
// TestXdsFallbackResolvedAddrs verifies that the fallback balancer specified
// in the provided lbconfig is initialized, and that it receives the addresses
// pushed by the resolver.
@ -252,12 +299,12 @@ func (s) TestXdsFallbackResolvedAddrs(t *testing.T) {
startupTimeout = 500 * time.Millisecond
defer func() { startupTimeout = defaultTimeout }()
builder := balancer.Get(xdsName)
builder := balancer.Get(edsName)
cc := newTestClientConn()
b := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
lb, ok := b.(*xdsBalancer)
lb, ok := b.(*edsBalancer)
if !ok {
t.Fatalf("builder.Build() returned a balancer of type %T, want *xdsBalancer", b)
t.Fatalf("builder.Build() returned a balancer of type %T, want *edsBalancer", b)
}
defer lb.Close()
@ -300,11 +347,11 @@ func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) {
newEDSBalancer = originalNewEDSBalancer
}()
builder := balancer.Get(xdsName)
builder := balancer.Get(edsName)
cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer)
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer)
if !ok {
t.Fatalf("unable to type assert to *xdsBalancer")
t.Fatalf("unable to type assert to *edsBalancer")
}
defer lb.Close()
addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
@ -338,6 +385,7 @@ func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) {
BalancerName: addr,
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
EDSServiceName: testEDSClusterName,
}
lb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs},
@ -378,11 +426,11 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) {
newEDSBalancer = originalNewEDSBalancer
}()
builder := balancer.Get(xdsName)
builder := balancer.Get(edsName)
cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer)
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer)
if !ok {
t.Fatalf("unable to type assert to *xdsBalancer")
t.Fatalf("unable to type assert to *edsBalancer")
}
defer lb.Close()
@ -403,6 +451,7 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) {
Name: fakeBalancerA,
Config: json.RawMessage("{}"),
},
EDSServiceName: testEDSClusterName,
},
responseToSend: testEDSResp,
expectedChildPolicy: &loadBalancingConfig{
@ -416,6 +465,7 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) {
Name: fakeBalancerB,
Config: json.RawMessage("{}"),
},
EDSServiceName: testEDSClusterName,
},
expectedChildPolicy: &loadBalancingConfig{
Name: string(fakeBalancerB),
@ -461,18 +511,25 @@ func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) {
newEDSBalancer = originalNewEDSBalancer
}()
builder := balancer.Get(xdsName)
testXDSClient := newFakeXDSClient()
originalxdsclientNew := xdsclientNew
xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) {
return testXDSClient, nil
}
defer func() {
xdsclientNew = originalxdsclientNew
}()
builder := balancer.Get(edsName)
cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer)
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
if !ok {
t.Fatalf("unable to type assert to *xdsBalancer")
t.Fatalf("unable to type assert to *edsBalancer")
}
defer lb.Close()
addr, td, _, cleanup := setupServer(t)
cfg := XDSConfig{
BalancerName: addr,
BalancerName: "",
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
}
@ -486,7 +543,9 @@ func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) {
BalancerConfig: &cfg2,
})
td.sendResp(&response{resp: testEDSResp})
// Callback with an EDS update, the balancer will build a EDS balancer, not
// a fallback.
testXDSClient.callEDSCallback(xdsclient.ParseEDSRespProtoForTesting(testClusterLoadAssignment), nil)
var i int
for i = 0; i < 10; i++ {
@ -499,7 +558,8 @@ func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) {
t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s")
}
cleanup()
// Callback with an error, the balancer should switch to fallback.
testXDSClient.callEDSCallback(nil, fmt.Errorf("xds client error"))
// verify fallback balancer B takes over
select {
@ -536,24 +596,34 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) {
newEDSBalancer = originalNewEDSBalancer
}()
builder := balancer.Get(xdsName)
testXDSClient := newFakeXDSClient()
originalxdsclientNew := xdsclientNew
xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) {
return testXDSClient, nil
}
defer func() {
xdsclientNew = originalxdsclientNew
}()
builder := balancer.Get(edsName)
cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer)
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
if !ok {
t.Fatalf("unable to type assert to *xdsBalancer")
t.Fatalf("unable to type assert to *edsBalancer")
}
defer lb.Close()
addr, td, _, cleanup := setupServer(t)
defer cleanup()
cfg := &XDSConfig{
BalancerName: addr,
BalancerName: "",
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
EDSServiceName: testEDSClusterName,
}
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg})
td.sendResp(&response{resp: testEDSResp})
// Callback with an EDS update, the balancer will build a EDS balancer, not
// a fallback.
testXDSClient.callEDSCallback(xdsclient.ParseEDSRespProtoForTesting(testClusterLoadAssignment), nil)
expectedScStateChange := &scStateChange{
sc: &fakeSubConn{},
@ -583,7 +653,8 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) {
// lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record
// to make sure there's a new one created and get the pointer to it.
lbABuilder.clearLastBalancer()
cleanup()
// Callback with an error, the balancer should switch to fallback.
testXDSClient.callEDSCallback(nil, fmt.Errorf("xds client error"))
// switch to fallback
// fallback balancer A takes over
@ -614,24 +685,33 @@ func (s) TestXdsBalancerFallBackSignalFromEdsBalancer(t *testing.T) {
newEDSBalancer = originalNewEDSBalancer
}()
builder := balancer.Get(xdsName)
testXDSClient := newFakeXDSClient()
originalxdsclientNew := xdsclientNew
xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) {
return testXDSClient, nil
}
defer func() {
xdsclientNew = originalxdsclientNew
}()
builder := balancer.Get(edsName)
cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer)
lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
if !ok {
t.Fatalf("unable to type assert to *xdsBalancer")
t.Fatalf("unable to type assert to *edsBalancer")
}
defer lb.Close()
addr, td, _, cleanup := setupServer(t)
defer cleanup()
cfg := &XDSConfig{
BalancerName: addr,
BalancerName: "",
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
}
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg})
td.sendResp(&response{resp: testEDSResp})
// Callback with an EDS update, the balancer will build a EDS balancer, not
// a fallback.
testXDSClient.callEDSCallback(xdsclient.ParseEDSRespProtoForTesting(testClusterLoadAssignment), nil)
expectedScStateChange := &scStateChange{
sc: &fakeSubConn{},
@ -661,7 +741,8 @@ func (s) TestXdsBalancerFallBackSignalFromEdsBalancer(t *testing.T) {
// lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record
// to make sure there's a new one created and get the pointer to it.
lbABuilder.clearLastBalancer()
cleanup()
// Callback with an error, the balancer should switch to fallback.
testXDSClient.callEDSCallback(nil, fmt.Errorf("xds client error"))
// switch to fallback
// fallback balancer A takes over
@ -770,10 +851,10 @@ func TestXdsBalancerConfigParsing(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := &xdsBalancerBuilder{}
b := &edsBalancerBuilder{}
got, err := b.ParseConfig(tt.js)
if (err != nil) != tt.wantErr {
t.Errorf("xdsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("edsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !cmp.Equal(got, tt.want) {

View File

@ -46,8 +46,9 @@ type Options struct {
// various dynamic resources. A single client object will be shared by the xds
// resolver and balancer implementations.
type Client struct {
cc *grpc.ClientConn // Connection to the xDS server
v2c *v2Client // Actual xDS client implementation using the v2 API
opts Options
cc *grpc.ClientConn // Connection to the xDS server
v2c *v2Client // Actual xDS client implementation using the v2 API
mu sync.Mutex
serviceCallback func(ServiceUpdate, error)
@ -74,8 +75,9 @@ func New(opts Options) (*Client, error) {
}
c := &Client{
cc: cc,
v2c: newV2Client(cc, opts.Config.NodeProto, backoff.DefaultExponential.Backoff),
opts: opts,
cc: cc,
v2c: newV2Client(cc, opts.Config.NodeProto, backoff.DefaultExponential.Backoff),
}
return c, nil
}
@ -151,3 +153,8 @@ func (c *Client) WatchService(serviceName string, callback func(ServiceUpdate, e
c.mu.Unlock()
}
}
// WatchEDS watches the ghost.
func (c *Client) WatchEDS(clusterName string, edsCb func(*EDSUpdate, error)) (cancel func()) {
return c.v2c.watchEDS(clusterName, edsCb)
}

View File

@ -0,0 +1,63 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package client
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/xds/internal/balancer/lrs"
)
// ReportLoad sends the load of the given clusterName from loadStore to the
// given server. If the server is not an empty string, and is different from the
// xds server, a new ClientConn will be created.
//
// The same options used for creating the Client will be used (including
// NodeProto, and dial options if necessary).
//
// It returns a function to cancel the load reporting stream. If server is
// different from xds server, the ClientConn will also be closed.
func (c *Client) ReportLoad(server string, clusterName string, loadStore lrs.Store) func() {
var (
cc *grpc.ClientConn
closeCC bool
)
if server == "" || server == c.opts.Config.BalancerName {
cc = c.cc
} else {
dopts := append([]grpc.DialOption{c.opts.Config.Creds}, c.opts.DialOpts...)
ccNew, err := grpc.Dial(server, dopts...)
if err != nil {
// An error from a non-blocking dial indicates something serious.
grpclog.Infof("xds: failed to dial load report server {%s}: %v", server, err)
return func() {}
}
cc = ccNew
closeCC = true
}
ctx, cancel := context.WithCancel(context.Background())
go loadStore.ReportTo(ctx, c.cc, clusterName, c.opts.Config.NodeProto)
return func() {
cancel()
if closeCC {
cc.Close()
}
}
}

View File

@ -204,6 +204,7 @@ func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error {
}
if cla.GetClusterName() != wi.target[0] {
grpclog.Warningf("xds: got uninteresting EDS resource, got %s, want %s", cla.GetClusterName(), wi.target[0])
// We won't validate the remaining resources. If one of the
// uninteresting ones is invalid, we will still ACK the response.
continue