From da649b362404f849b46c52e5f22009a3823575f2 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 21 Nov 2019 14:10:04 -0800 Subject: [PATCH] xds: use the shared xds client in eds balancer (#3197) This PR removes the xds_client implementation from eds balancer, and replaces it with a xds_client wrapper. (The xds_client wrapper has very similar API as the old xds_client implementation, so the change in the eds balancer is minimal). The eds balancer currently doesn't look for xds_client from attributes, and always creates a new xds_client. The attributes change will be done in a following up change. --- xds/internal/balancer/config.go | 3 + .../balancer/edsbalancer/test_util_test.go | 4 +- xds/internal/balancer/lrs/lrs.go | 37 ++- xds/internal/balancer/lrs/lrs_test.go | 8 +- xds/internal/balancer/xds.go | 107 +++---- xds/internal/balancer/xds_client.go | 260 ------------------ xds/internal/balancer/xds_client_test.go | 169 ++---------- xds/internal/balancer/xds_client_wrapper.go | 151 ++++++++++ xds/internal/balancer/xds_lrs_test.go | 12 +- xds/internal/balancer/xds_old.go | 46 ++++ xds/internal/balancer/xds_test.go | 153 ++++++++--- xds/internal/client/client.go | 15 +- xds/internal/client/client_loadreport.go | 63 +++++ xds/internal/client/eds.go | 1 + 14 files changed, 502 insertions(+), 527 deletions(-) delete mode 100644 xds/internal/balancer/xds_client.go create mode 100644 xds/internal/balancer/xds_client_wrapper.go create mode 100644 xds/internal/balancer/xds_old.go create mode 100644 xds/internal/client/client_loadreport.go diff --git a/xds/internal/balancer/config.go b/xds/internal/balancer/config.go index 4089944e..fc2fe9a5 100644 --- a/xds/internal/balancer/config.go +++ b/xds/internal/balancer/config.go @@ -43,6 +43,9 @@ type XDSConfig struct { // LRS server to send load reports to. If not present, load reporting // will be disabled. If set to the empty string, load reporting will // be sent to the same server that we obtained CDS data from. + // + // TODO: this should be a pointer to a string, so nil means load reporting + // is disabled. LrsLoadReportingServerName string } diff --git a/xds/internal/balancer/edsbalancer/test_util_test.go b/xds/internal/balancer/edsbalancer/test_util_test.go index 799fc3f7..a2b06680 100644 --- a/xds/internal/balancer/edsbalancer/test_util_test.go +++ b/xds/internal/balancer/edsbalancer/test_util_test.go @@ -26,6 +26,8 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal" + + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" ) const testSubConnsCount = 16 @@ -166,7 +168,7 @@ func (tls *testLoadStore) CallServerLoad(l internal.Locality, name string, d flo tls.callsCost = append(tls.callsCost, testServerLoad{name: name, d: d}) } -func (*testLoadStore) ReportTo(ctx context.Context, cc *grpc.ClientConn) { +func (*testLoadStore) ReportTo(ctx context.Context, cc *grpc.ClientConn, clusterName string, node *corepb.Node) { panic("not implemented") } diff --git a/xds/internal/balancer/lrs/lrs.go b/xds/internal/balancer/lrs/lrs.go index aa70d20e..7f1c1915 100644 --- a/xds/internal/balancer/lrs/lrs.go +++ b/xds/internal/balancer/lrs/lrs.go @@ -43,7 +43,8 @@ type Store interface { CallStarted(l internal.Locality) CallFinished(l internal.Locality, err error) CallServerLoad(l internal.Locality, name string, d float64) - ReportTo(ctx context.Context, cc *grpc.ClientConn) + // Report the load of clusterName to cc. + ReportTo(ctx context.Context, cc *grpc.ClientConn, clusterName string, node *corepb.Node) } type rpcCountData struct { @@ -140,7 +141,6 @@ func (rld *rpcLoadData) loadAndClear() (s float64, c uint64) { // lrsStore collects loads from xds balancer, and periodically sends load to the // server. type lrsStore struct { - serviceName string backoff backoff.Strategy lastReported time.Time @@ -149,9 +149,8 @@ type lrsStore struct { } // NewStore creates a store for load reports. -func NewStore(serviceName string) Store { +func NewStore() Store { return &lrsStore{ - serviceName: serviceName, backoff: backoff.DefaultExponential, lastReported: time.Now(), } @@ -284,7 +283,7 @@ func (ls *lrsStore) buildStats(clusterName string) []*endpointpb.ClusterStats { // ReportTo makes a streaming lrs call to cc and blocks. // // It retries the call (with backoff) until ctx is canceled. -func (ls *lrsStore) ReportTo(ctx context.Context, cc *grpc.ClientConn) { +func (ls *lrsStore) ReportTo(ctx context.Context, cc *grpc.ClientConn, clusterName string, node *corepb.Node) { c := lrsgrpc.NewLoadReportingServiceClient(cc) var ( retryCount int @@ -311,46 +310,46 @@ func (ls *lrsStore) ReportTo(ctx context.Context, cc *grpc.ClientConn) { doBackoff = true stream, err := c.StreamLoadStats(ctx) if err != nil { - grpclog.Infof("lrs: failed to create stream: %v", err) + grpclog.Warningf("lrs: failed to create stream: %v", err) continue } if err := stream.Send(&lrspb.LoadStatsRequest{ - // TODO: when moving this to the xds client, the Node - // field needs to be set to node from bootstrap file. - // Node: c.config.NodeProto, ClusterStats: []*endpointpb.ClusterStats{{ - // TODO: this is user's dial target now, as a temporary - // solution. Eventually this will be cluster name from CDS's response. - ClusterName: ls.serviceName, + ClusterName: clusterName, }}, + Node: node, }); err != nil { - grpclog.Infof("lrs: failed to send first request: %v", err) + grpclog.Warningf("lrs: failed to send first request: %v", err) continue } first, err := stream.Recv() if err != nil { - grpclog.Infof("lrs: failed to receive first response: %v", err) + grpclog.Warningf("lrs: failed to receive first response: %v", err) continue } interval, err := ptypes.Duration(first.LoadReportingInterval) if err != nil { - grpclog.Infof("lrs: failed to convert report interval: %v", err) + grpclog.Warningf("lrs: failed to convert report interval: %v", err) continue } if len(first.Clusters) != 1 { - grpclog.Infof("lrs: received multiple clusters %v, expect one cluster", first.Clusters) + grpclog.Warningf("lrs: received multiple clusters %v, expect one cluster", first.Clusters) + continue + } + if first.Clusters[0] != clusterName { + grpclog.Warningf("lrs: received cluster is unexpected. Got %v, want %v", first.Clusters[0], clusterName) continue } if first.ReportEndpointGranularity { // TODO: fixme to support per endpoint loads. - grpclog.Infof("lrs: endpoint loads requested, but not supported by current implementation") + grpclog.Warningf("lrs: endpoint loads requested, but not supported by current implementation") continue } // No backoff afterwards. doBackoff = false retryCount = 0 - ls.sendLoads(ctx, stream, first.Clusters[0], interval) + ls.sendLoads(ctx, stream, clusterName, interval) } } @@ -366,7 +365,7 @@ func (ls *lrsStore) sendLoads(ctx context.Context, stream lrsgrpc.LoadReportingS if err := stream.Send(&lrspb.LoadStatsRequest{ ClusterStats: ls.buildStats(clusterName), }); err != nil { - grpclog.Infof("lrs: failed to send report: %v", err) + grpclog.Warningf("lrs: failed to send report: %v", err) return } } diff --git a/xds/internal/balancer/lrs/lrs_test.go b/xds/internal/balancer/lrs/lrs_test.go index e79490c2..73e620f8 100644 --- a/xds/internal/balancer/lrs/lrs_test.go +++ b/xds/internal/balancer/lrs/lrs_test.go @@ -128,7 +128,7 @@ func Test_lrsStore_buildStats_drops(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ls := NewStore(testService).(*lrsStore) + ls := NewStore().(*lrsStore) for _, ds := range tt.drops { var ( @@ -255,7 +255,7 @@ func Test_lrsStore_buildStats_rpcCounts(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ls := NewStore(testService).(*lrsStore) + ls := NewStore().(*lrsStore) // InProgress count doesn't get cleared at each buildStats, keep // them to carry over. @@ -439,7 +439,7 @@ func Test_lrsStore_ReportTo(t *testing.T) { }) defer cleanup() - ls := NewStore(testService) + ls := NewStore() cc, err := grpc.Dial(addr, grpc.WithInsecure()) if err != nil { t.Fatalf("failed to dial: %v", err) @@ -448,7 +448,7 @@ func Test_lrsStore_ReportTo(t *testing.T) { defer cancel() done := make(chan struct{}) go func() { - ls.ReportTo(ctx, cc) + ls.ReportTo(ctx, cc, testService, nil) close(done) }() diff --git a/xds/internal/balancer/xds.go b/xds/internal/balancer/xds.go index 5e67250c..b3055784 100644 --- a/xds/internal/balancer/xds.go +++ b/xds/internal/balancer/xds.go @@ -39,7 +39,7 @@ import ( const ( defaultTimeout = 10 * time.Second - xdsName = "xds_experimental" + edsName = "experimental_eds" ) var ( @@ -53,15 +53,15 @@ var ( ) func init() { - balancer.Register(&xdsBalancerBuilder{}) + balancer.Register(&edsBalancerBuilder{}) } -type xdsBalancerBuilder struct{} +type edsBalancerBuilder struct{} // Build helps implement the balancer.Builder interface. -func (b *xdsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { +func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { ctx, cancel := context.WithCancel(context.Background()) - x := &xdsBalancer{ + x := &edsBalancer{ ctx: ctx, cancel: cancel, buildOpts: opts, @@ -71,7 +71,7 @@ func (b *xdsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp grpcUpdate: make(chan interface{}), xdsClientUpdate: make(chan interface{}), timer: createDrainedTimer(), // initialized a timer that won't fire without reset - loadStore: lrs.NewStore(opts.Target.Endpoint), + loadStore: lrs.NewStore(), } x.cc = &xdsClientConn{ updateState: x.connStateMgr.updateState, @@ -81,11 +81,11 @@ func (b *xdsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp return x } -func (b *xdsBalancerBuilder) Name() string { - return xdsName +func (b *edsBalancerBuilder) Name() string { + return edsName } -func (b *xdsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { +func (b *edsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { var cfg XDSConfig if err := json.Unmarshal(c, &cfg); err != nil { return nil, fmt.Errorf("unable to unmarshal balancer config %s into xds config", string(c)) @@ -94,7 +94,7 @@ func (b *xdsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadB } // edsBalancerInterface defines the interface that edsBalancer must implement to -// communicate with xdsBalancer. +// communicate with edsBalancer. // // It's implemented by the real eds balancer and a fake testing eds balancer. type edsBalancerInterface interface { @@ -108,11 +108,11 @@ type edsBalancerInterface interface { Close() } -var _ balancer.V2Balancer = (*xdsBalancer)(nil) // Assert that we implement V2Balancer +var _ balancer.V2Balancer = (*edsBalancer)(nil) // Assert that we implement V2Balancer -// xdsBalancer manages xdsClient and the actual balancer that does load balancing (either edsBalancer, +// edsBalancer manages xdsClient and the actual balancer that does load balancing (either edsBalancer, // or fallback LB). -type xdsBalancer struct { +type edsBalancer struct { cc balancer.ClientConn // *xdsClientConn buildOpts balancer.BuildOptions startupTimeout time.Duration @@ -120,25 +120,29 @@ type xdsBalancer struct { connStateMgr *connStateMgr ctx context.Context cancel context.CancelFunc - startup bool // startup indicates whether this xdsBalancer is in startup stage. + startup bool // startup indicates whether this edsBalancer is in startup stage. inFallbackMonitor bool - // xdsBalancer continuously monitor the channels below, and will handle events from them in sync. + // edsBalancer continuously monitor the channels below, and will handle events from them in sync. grpcUpdate chan interface{} xdsClientUpdate chan interface{} timer *time.Timer noSubConnAlert <-chan struct{} - client *client // may change when passed a different service config - config *XDSConfig // may change when passed a different service config + client *xdsclientWrapper // may change when passed a different service config + config *XDSConfig // may change when passed a different service config xdsLB edsBalancerInterface fallbackLB balancer.Balancer fallbackInitData *resolver.State // may change when HandleResolved address is called loadStore lrs.Store } -func (x *xdsBalancer) startNewXDSClient(u *XDSConfig) { - // If the xdsBalancer is in startup stage, then we need to apply the startup timeout for the first +// TODO: cleanup this function, or just remove it. It was here because the xds +// server name from service config can change, and we need to migrate from the +// old one to the new one. Now the xds server name is specified by the bootstrap +// file, and should never change. There's no need for this. +func (x *edsBalancer) startNewXDSClient(u *XDSConfig) { + // If the edsBalancer is in startup stage, then we need to apply the startup timeout for the first // xdsClient to get a response from the traffic director. if x.startup { x.startFallbackMonitoring() @@ -148,7 +152,7 @@ func (x *xdsBalancer) startNewXDSClient(u *XDSConfig) { // connect to it. However, previous xds client should not be closed until the new one successfully // connects to the traffic director (i.e. get an ADS response from the traffic director). Therefore, // we let each new client to be responsible to close its immediate predecessor. In this way, - // xdsBalancer does not to implement complex synchronization to achieve the same purpose. + // edsBalancer does not to implement complex synchronization to achieve the same purpose. prevClient := x.client // haveGotADS is true means, this xdsClient has got ADS response from director in the past, which // means it can close previous client if it hasn't and it now can send lose contact signal for @@ -189,14 +193,20 @@ func (x *xdsBalancer) startNewXDSClient(u *XDSConfig) { prevClient.close() } } - x.client = newXDSClient(u.BalancerName, u.EDSServiceName, x.buildOpts, x.loadStore, newADS, loseContact, exitCleanup) - go x.client.run() + // TODO: handle cfg.LrsLoadReportingServerName and remove log. + if u.LrsLoadReportingServerName != "" { + grpclog.Warningf("xds: lrsLoadReportingServerName is not empty, but is not handled") + } + + // TODO: stop using u.BalancerName. The value should come from bootstrap + // file. It's only used in tests now. + x.client = newXDSClientWrapper(u.BalancerName, u.EDSServiceName, x.buildOpts, u.LrsLoadReportingServerName, x.loadStore, newADS, loseContact, exitCleanup) } -// run gets executed in a goroutine once xdsBalancer is created. It monitors updates from grpc, -// xdsClient and load balancer. It synchronizes the operations that happen inside xdsBalancer. It -// exits when xdsBalancer is closed. -func (x *xdsBalancer) run() { +// run gets executed in a goroutine once edsBalancer is created. It monitors updates from grpc, +// xdsClient and load balancer. It synchronizes the operations that happen inside edsBalancer. It +// exits when edsBalancer is closed. +func (x *edsBalancer) run() { for { select { case update := <-x.grpcUpdate: @@ -222,7 +232,7 @@ func (x *xdsBalancer) run() { } } -func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { +func (x *edsBalancer) handleGRPCUpdate(update interface{}) { switch u := update.(type) { case *subConnStateUpdate: if x.xdsLB != nil { @@ -245,11 +255,6 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { var fallbackChanged bool // service config has been updated. if !reflect.DeepEqual(cfg, x.config) { - // TODO: handle cfg.LrsLoadReportingServerName and remove log. - if cfg.LrsLoadReportingServerName != "" { - grpclog.Warningf("xds: lrsLoadReportingServerName is not empty, but is not handled") - } - if x.config == nil { // The first time we get config, we just need to start the xdsClient. x.startNewXDSClient(cfg) @@ -262,8 +267,12 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { return } - // With a different BalancerName, we need to create a new xdsClient. - if cfg.BalancerName != x.config.BalancerName { + // Create a different xds_client if part of the config is different. + // + // TODO: this and all client related code, including comparing new + // config with old, creating new client, should be moved to a + // dedicated struct, and handled together. + if cfg.BalancerName != x.config.BalancerName || cfg.EDSServiceName != x.config.EDSServiceName || cfg.LrsLoadReportingServerName != x.config.LrsLoadReportingServerName { x.startNewXDSClient(cfg) } // We will update the xdsLB with the new child policy, if we got a @@ -301,7 +310,7 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { } } -func (x *xdsBalancer) handleXDSClientUpdate(update interface{}) { +func (x *edsBalancer) handleXDSClientUpdate(update interface{}) { switch u := update.(type) { case *edsResp: select { @@ -378,15 +387,15 @@ type subConnStateUpdate struct { state balancer.SubConnState } -func (x *xdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { +func (x *edsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { grpclog.Error("UpdateSubConnState should be called instead of HandleSubConnStateChange") } -func (x *xdsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { +func (x *edsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { grpclog.Error("UpdateResolverState should be called instead of HandleResolvedAddrs") } -func (x *xdsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { +func (x *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { update := &subConnStateUpdate{ sc: sc, state: state, @@ -397,11 +406,11 @@ func (x *xdsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Sub } } -func (x *xdsBalancer) ResolverError(error) { +func (x *edsBalancer) ResolverError(error) { // Ignore for now } -func (x *xdsBalancer) UpdateClientConnState(s balancer.ClientConnState) error { +func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error { select { case x.grpcUpdate <- &s: case <-x.ctx.Done(): @@ -414,7 +423,7 @@ type edsResp struct { resp *xdsclient.EDSUpdate } -func (x *xdsBalancer) newADSResponse(ctx context.Context, resp *xdsclient.EDSUpdate) error { +func (x *edsBalancer) newADSResponse(ctx context.Context, resp *xdsclient.EDSUpdate) error { select { case x.xdsClientUpdate <- &edsResp{ctx: ctx, resp: resp}: case <-x.ctx.Done(): @@ -428,7 +437,7 @@ type loseContact struct { ctx context.Context } -func (x *xdsBalancer) loseContact(ctx context.Context) { +func (x *edsBalancer) loseContact(ctx context.Context) { select { case x.xdsClientUpdate <- &loseContact{ctx: ctx}: case <-x.ctx.Done(): @@ -436,7 +445,7 @@ func (x *xdsBalancer) loseContact(ctx context.Context) { } } -func (x *xdsBalancer) switchFallback() { +func (x *edsBalancer) switchFallback() { if x.xdsLB != nil { x.xdsLB.Close() x.xdsLB = nil @@ -446,7 +455,7 @@ func (x *xdsBalancer) switchFallback() { x.cancelFallbackMonitoring() } -func (x *xdsBalancer) updateFallbackWithResolverState(s *resolver.State) { +func (x *edsBalancer) updateFallbackWithResolverState(s *resolver.State) { if lb, ok := x.fallbackLB.(balancer.V2Balancer); ok { lb.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{ Addresses: s.Addresses, @@ -462,7 +471,7 @@ func (x *xdsBalancer) updateFallbackWithResolverState(s *resolver.State) { // It will cancel fallback monitoring if we are in fallback monitoring stage. // If there's no running edsBalancer currently, it will create one and initialize it. Also, it will // shutdown the fallback balancer if there's one running. -func (x *xdsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() { +func (x *edsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() { // xDS update will cancel fallback monitoring if we are in fallback monitoring stage. x.cancelFallbackMonitoring() @@ -479,7 +488,7 @@ func (x *xdsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() { } } -func (x *xdsBalancer) buildFallBackBalancer(c *XDSConfig) { +func (x *edsBalancer) buildFallBackBalancer(c *XDSConfig) { if c.FallBackPolicy == nil { x.buildFallBackBalancer(&XDSConfig{ FallBackPolicy: &loadBalancingConfig{ @@ -501,7 +510,7 @@ func (x *xdsBalancer) buildFallBackBalancer(c *XDSConfig) { // timeout. // 2. After xds client loses contact with the remote, fallback if all connections to the backends are // lost (i.e. not in state READY). -func (x *xdsBalancer) startFallbackMonitoring() { +func (x *edsBalancer) startFallbackMonitoring() { if x.startup { x.startup = false x.timer.Reset(x.startupTimeout) @@ -520,7 +529,7 @@ func (x *xdsBalancer) startFallbackMonitoring() { // There are two cases where fallback monitoring should be canceled: // 1. xDS client returns a new ADS message. // 2. fallback has been triggered. -func (x *xdsBalancer) cancelFallbackMonitoring() { +func (x *edsBalancer) cancelFallbackMonitoring() { if !x.timer.Stop() { select { case <-x.timer.C: @@ -534,7 +543,7 @@ func (x *xdsBalancer) cancelFallbackMonitoring() { x.inFallbackMonitor = false } -func (x *xdsBalancer) Close() { +func (x *edsBalancer) Close() { x.cancel() } diff --git a/xds/internal/balancer/xds_client.go b/xds/internal/balancer/xds_client.go deleted file mode 100644 index 6aed32ff..00000000 --- a/xds/internal/balancer/xds_client.go +++ /dev/null @@ -1,260 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package balancer - -import ( - "context" - "net" - "sync" - "time" - - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - xdsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" - "github.com/golang/protobuf/ptypes" - "google.golang.org/grpc" - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/internal/backoff" - "google.golang.org/grpc/internal/channelz" - "google.golang.org/grpc/xds/internal/balancer/lrs" - xdsclient "google.golang.org/grpc/xds/internal/client" - "google.golang.org/grpc/xds/internal/client/bootstrap" -) - -const ( - edsType = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment" -) - -// client is responsible for connecting to the specified traffic director, passing the received -// ADS response from the traffic director, and sending notification when communication with the -// traffic director is lost. -type client struct { - ctx context.Context - cancel context.CancelFunc - serviceName string - cli xdsgrpc.AggregatedDiscoveryServiceClient - dialer func(context.Context, string) (net.Conn, error) - channelzParentID int64 - newADS func(ctx context.Context, resp *xdsclient.EDSUpdate) error - loseContact func(ctx context.Context) - cleanup func() - backoff backoff.Strategy - - loadStore lrs.Store - loadReportOnce sync.Once - config *bootstrap.Config - - mu sync.Mutex - cc *grpc.ClientConn -} - -func (c *client) run() { - c.dial() - c.makeADSCall() -} - -func (c *client) close() { - c.cancel() - c.mu.Lock() - if c.cc != nil { - c.cc.Close() - } - c.mu.Unlock() - c.cleanup() -} - -func (c *client) dial() { - dopts := []grpc.DialOption{c.config.Creds} - if c.dialer != nil { - dopts = append(dopts, grpc.WithContextDialer(c.dialer)) - } - // Explicitly set pickfirst as the balancer. - dopts = append(dopts, grpc.WithBalancerName(grpc.PickFirstBalancerName)) - if channelz.IsOn() { - dopts = append(dopts, grpc.WithChannelzParentID(c.channelzParentID)) - } - - cc, err := grpc.DialContext(c.ctx, c.config.BalancerName, dopts...) - if err != nil { - // This could fail due to ctx error, which means this client was closed. - grpclog.Warningf("xds: failed to dial: %v", err) - return - } - c.mu.Lock() - select { - case <-c.ctx.Done(): - cc.Close() - default: - // only assign c.cc when xds client has not been closed, to prevent ClientConn leak. - c.cc = cc - } - c.mu.Unlock() -} - -func (c *client) newEDSRequest() *xdspb.DiscoveryRequest { - edsReq := &xdspb.DiscoveryRequest{ - Node: c.config.NodeProto, - // TODO: the expected ResourceName could be in a different format from - // dial target. (test_service.test_namespace.traffic_director.com vs - // test_namespace:test_service). - // - // The solution today is to always set dial target in resource_names. - // - // A future solution could be: always do CDS, get cluster name from CDS - // response, and use it here. - // `ResourceNames: []string{c.clusterName},` - TypeUrl: edsType, - ResourceNames: []string{c.serviceName}, - } - return edsReq -} - -func (c *client) makeADSCall() { - c.cli = xdsgrpc.NewAggregatedDiscoveryServiceClient(c.cc) - retryCount := 0 - var doRetry bool - - for { - select { - case <-c.ctx.Done(): - return - default: - } - - if doRetry { - backoffTimer := time.NewTimer(c.backoff.Backoff(retryCount)) - select { - case <-backoffTimer.C: - case <-c.ctx.Done(): - backoffTimer.Stop() - return - } - retryCount++ - } - - firstRespReceived := c.adsCallAttempt() - if firstRespReceived { - retryCount = 0 - doRetry = false - } else { - doRetry = true - } - c.loseContact(c.ctx) - } -} - -func (c *client) adsCallAttempt() (firstRespReceived bool) { - firstRespReceived = false - ctx, cancel := context.WithCancel(c.ctx) - defer cancel() - st, err := c.cli.StreamAggregatedResources(ctx, grpc.WaitForReady(true)) - if err != nil { - grpclog.Infof("xds: failed to initial ADS streaming RPC due to %v", err) - return - } - if err := st.Send(c.newEDSRequest()); err != nil { - // current stream is broken, start a new one. - grpclog.Infof("xds: ads RPC failed due to err: %v, when sending the EDS request", err) - return - } - for { - resp, err := st.Recv() - if err != nil { - // current stream is broken, start a new one. - grpclog.Infof("xds: ads RPC failed due to err: %v, when receiving the response", err) - return - } - firstRespReceived = true - resources := resp.GetResources() - if len(resources) < 1 { - grpclog.Warning("xds: ADS response contains 0 resource info.") - // start a new call as server misbehaves by sending a ADS response with 0 resource info. - return - } - if resp.GetTypeUrl() != edsType { - grpclog.Warningf("xds: received non-EDS response: %v", resp.GetTypeUrl()) - return - } - var adsResp ptypes.DynamicAny - if err := ptypes.UnmarshalAny(resources[0], &adsResp); err != nil { - grpclog.Warningf("xds: failed to unmarshal resources due to %v.", err) - return - } - parsed, err := xdsclient.ParseEDSRespProto(adsResp.Message.(*xdspb.ClusterLoadAssignment)) - if err != nil { - grpclog.Warningf("xds: parsing new EDS message failed due to %v.", err) - return - } - if err := c.newADS(c.ctx, parsed); err != nil { - grpclog.Warningf("xds: processing new ADS message failed due to %v.", err) - return - } - // Only start load reporting after ADS resp is received. - // - // Also, newADS() will close the previous load reporting stream, so we - // don't have double reporting. - c.loadReportOnce.Do(func() { - if c.loadStore != nil { - go c.loadStore.ReportTo(c.ctx, c.cc) - } - }) - } -} - -func newXDSClient(balancerName string, edsServiceName string, opts balancer.BuildOptions, loadStore lrs.Store, newADS func(context.Context, *xdsclient.EDSUpdate) error, loseContact func(ctx context.Context), exitCleanup func()) *client { - c := &client{ - serviceName: edsServiceName, - dialer: opts.Dialer, - channelzParentID: opts.ChannelzParentID, - newADS: newADS, - loseContact: loseContact, - cleanup: exitCleanup, - backoff: backoff.DefaultExponential, - loadStore: loadStore, - } - - if c.serviceName == "" { - c.serviceName = opts.Target.Endpoint - } - - c.ctx, c.cancel = context.WithCancel(context.Background()) - - // It is possible that NewConfig returns a Config object with certain - // fields left unspecified. If so, we need to use some sane defaults here. - c.config = bootstrap.NewConfig() - if c.config.BalancerName == "" { - c.config.BalancerName = balancerName - } - if c.config.Creds == nil { - c.config.Creds = credsFromDefaults(balancerName, &opts) - } - return c -} - -func credsFromDefaults(balancerName string, opts *balancer.BuildOptions) grpc.DialOption { - if opts.DialCreds == nil { - grpclog.Warning("xds: no credentials available, using Insecure") - return grpc.WithInsecure() - } - if err := opts.DialCreds.OverrideServerName(balancerName); err != nil { - grpclog.Warningf("xds: failed to override the server name in credentials: %v, using Insecure", err) - return grpc.WithInsecure() - } - return grpc.WithTransportCredentials(opts.DialCreds) -} diff --git a/xds/internal/balancer/xds_client_test.go b/xds/internal/balancer/xds_client_test.go index 36778391..22a053c7 100644 --- a/xds/internal/balancer/xds_client_test.go +++ b/xds/internal/balancer/xds_client_test.go @@ -23,7 +23,6 @@ import ( "io" "net" "testing" - "time" xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" @@ -44,15 +43,15 @@ import ( xdsclient "google.golang.org/grpc/xds/internal/client" ) +const ( + edsType = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment" +) + var ( - testServiceName = "test/foo" - testEDSServiceName = "test/service/eds" - testEDSReq = &xdspb.DiscoveryRequest{ - TypeUrl: edsType, - ResourceNames: []string{testServiceName}, - } + testServiceName = "test/foo" + testEDSClusterName = "test/service/eds" testClusterLoadAssignment = &xdspb.ClusterLoadAssignment{ - ClusterName: testServiceName, + ClusterName: testEDSClusterName, Endpoints: []*endpointpb.LocalityLbEndpoints{{ Locality: &corepb.Locality{ Region: "asia-east1", @@ -184,8 +183,6 @@ type testConfig struct { expectedRequests []*xdspb.DiscoveryRequest responsesToSend []*xdspb.DiscoveryResponse expectedADSResponses []proto.Message - adsErr error - svrErr error } func setupServer(t *testing.T) (addr string, td *testTrafficDirector, lrss *lrsServer, cleanup func()) { @@ -214,15 +211,19 @@ func setupServer(t *testing.T) (addr string, td *testTrafficDirector, lrss *lrsS func (s) TestXdsClientResponseHandling(t *testing.T) { for _, test := range []*testConfig{ { - expectedRequests: []*xdspb.DiscoveryRequest{testEDSReq}, - responsesToSend: []*xdspb.DiscoveryResponse{testEDSResp}, - expectedADSResponses: []proto.Message{testClusterLoadAssignment}, - }, - { - edsServiceName: testEDSServiceName, + // Test that if clusterName is not set, dialing target is used. expectedRequests: []*xdspb.DiscoveryRequest{{ TypeUrl: edsType, - ResourceNames: []string{testEDSServiceName}, + ResourceNames: []string{testServiceName}, // ResourceName is dialing target. + Node: &corepb.Node{}, + }}, + }, + { + edsServiceName: testEDSClusterName, + expectedRequests: []*xdspb.DiscoveryRequest{{ + TypeUrl: edsType, + ResourceNames: []string{testEDSClusterName}, + Node: &corepb.Node{}, }}, responsesToSend: []*xdspb.DiscoveryResponse{testEDSResp}, expectedADSResponses: []proto.Message{testClusterLoadAssignment}, @@ -240,9 +241,8 @@ func testXdsClientResponseHandling(t *testing.T, test *testConfig) { adsChan <- i return nil } - client := newXDSClient(addr, test.edsServiceName, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, func(context.Context) {}, func() {}) + client := newXDSClientWrapper(addr, test.edsServiceName, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, "", nil, newADS, func(context.Context) {}, func() {}) defer client.close() - go client.run() for _, expectedReq := range test.expectedRequests { req := td.getReq() @@ -250,7 +250,7 @@ func testXdsClientResponseHandling(t *testing.T, test *testConfig) { t.Fatalf("ads RPC failed with err: %v", req.err) } if !proto.Equal(req.req, expectedReq) { - t.Fatalf("got ADS request %T %v, expected: %T %v", req.req, req.req, expectedReq, expectedReq) + t.Fatalf("got ADS request %T, expected: %T, diff: %s", req.req, expectedReq, cmp.Diff(req.req, expectedReq, cmp.Comparer(proto.Equal))) } } @@ -266,132 +266,3 @@ func testXdsClientResponseHandling(t *testing.T, test *testConfig) { } } } - -func (s) TestXdsClientLoseContact(t *testing.T) { - for _, test := range []*testConfig{{ - responsesToSend: []*xdspb.DiscoveryResponse{testEDSResp}, - }} { - testXdsClientLoseContactRemoteClose(t, test) - } - - for _, test := range []*testConfig{{ - responsesToSend: []*xdspb.DiscoveryResponse{{ - Resources: []*anypb.Any{ - { - TypeUrl: "not-eds", - Value: marshaledClusterLoadAssignment, - }, - }, - TypeUrl: "not-eds", - }}, - }} { - testXdsClientLoseContactADSRelatedErrorOccur(t, test) - } -} - -func testXdsClientLoseContactRemoteClose(t *testing.T, test *testConfig) { - addr, td, _, cleanup := setupServer(t) - defer cleanup() - adsChan := make(chan *xdsclient.EDSUpdate, 10) - newADS := func(ctx context.Context, i *xdsclient.EDSUpdate) error { - adsChan <- i - return nil - } - contactChan := make(chan *loseContact, 10) - loseContactFunc := func(context.Context) { - contactChan <- &loseContact{} - } - client := newXDSClient(addr, test.edsServiceName, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, loseContactFunc, func() {}) - defer client.close() - go client.run() - - // make sure server side get the request (i.e stream created successfully on client side) - td.getReq() - - for _, resp := range test.responsesToSend { - td.sendResp(&response{resp: resp}) - // make sure client side receives it - <-adsChan - } - cleanup() - - select { - case <-contactChan: - case <-time.After(2 * time.Second): - t.Fatal("time out when expecting lost contact signal") - } -} - -func testXdsClientLoseContactADSRelatedErrorOccur(t *testing.T, test *testConfig) { - addr, td, _, cleanup := setupServer(t) - defer cleanup() - - adsChan := make(chan *xdsclient.EDSUpdate, 10) - newADS := func(ctx context.Context, i *xdsclient.EDSUpdate) error { - adsChan <- i - return test.adsErr - } - contactChan := make(chan *loseContact, 10) - loseContactFunc := func(context.Context) { - contactChan <- &loseContact{} - } - client := newXDSClient(addr, test.edsServiceName, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, loseContactFunc, func() {}) - defer client.close() - go client.run() - - // make sure server side get the request (i.e stream created successfully on client side) - td.getReq() - - for _, resp := range test.responsesToSend { - td.sendResp(&response{resp: resp}) - } - - select { - case <-contactChan: - case <-time.After(2 * time.Second): - t.Fatal("time out when expecting lost contact signal") - } -} - -func (s) TestXdsClientExponentialRetry(t *testing.T) { - cfg := &testConfig{ - svrErr: status.Errorf(codes.Aborted, "abort the stream to trigger retry"), - } - addr, td, _, cleanup := setupServer(t) - defer cleanup() - - adsChan := make(chan *xdsclient.EDSUpdate, 10) - newADS := func(ctx context.Context, i *xdsclient.EDSUpdate) error { - adsChan <- i - return nil - } - contactChan := make(chan *loseContact, 10) - loseContactFunc := func(context.Context) { - contactChan <- &loseContact{} - } - client := newXDSClient(addr, "", balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, loseContactFunc, func() {}) - defer client.close() - go client.run() - - var secondRetry, thirdRetry time.Time - for i := 0; i < 3; i++ { - // make sure server side get the request (i.e stream created successfully on client side) - td.getReq() - td.sendResp(&response{err: cfg.svrErr}) - - select { - case <-contactChan: - if i == 1 { - secondRetry = time.Now() - } - if i == 2 { - thirdRetry = time.Now() - } - case <-time.After(2 * time.Second): - t.Fatal("time out when expecting lost contact signal") - } - } - if thirdRetry.Sub(secondRetry) < 1*time.Second { - t.Fatalf("interval between second and third retry is %v, expected > 1s", thirdRetry.Sub(secondRetry)) - } -} diff --git a/xds/internal/balancer/xds_client_wrapper.go b/xds/internal/balancer/xds_client_wrapper.go new file mode 100644 index 00000000..46fb1b8f --- /dev/null +++ b/xds/internal/balancer/xds_client_wrapper.go @@ -0,0 +1,151 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package balancer + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/xds/internal/balancer/lrs" + xdsclient "google.golang.org/grpc/xds/internal/client" + "google.golang.org/grpc/xds/internal/client/bootstrap" +) + +// xdsClientInterface contains only the xds_client methods needed by EDS +// balancer. It's defined so we can override xdsclientNew function in tests. +type xdsClientInterface interface { + WatchEDS(clusterName string, edsCb func(*xdsclient.EDSUpdate, error)) (cancel func()) + ReportLoad(server string, clusterName string, loadStore lrs.Store) (cancel func()) + Close() +} + +var ( + xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) { + return xdsclient.New(opts) + } + bootstrapConfigNew = bootstrap.NewConfig +) + +// xdsclientWpapper is responsible for getting the xds client from attributes or +// creating a new xds client, and start watching EDS. The given callbacks will +// be called with EDS updates or errors. +type xdsclientWrapper struct { + ctx context.Context + cancel context.CancelFunc + cancelWatch func() + cancelLoadReport func() + cleanup func() + + xdsclient xdsClientInterface +} + +func (c *xdsclientWrapper) close() { + if c.cancelLoadReport != nil { + c.cancelLoadReport() + } + if c.cancelWatch != nil { + c.cancelWatch() + } + if c.xdsclient != nil { + // TODO: this shouldn't close xdsclient if it's from attributes. + c.xdsclient.Close() + } + c.cleanup() +} + +func newXDSClientWrapper(balancerName string, edsServiceName string, bbo balancer.BuildOptions, loadReportServer string, loadStore lrs.Store, newADS func(context.Context, *xdsclient.EDSUpdate) error, loseContact func(ctx context.Context), exitCleanup func()) *xdsclientWrapper { + ctx, cancel := context.WithCancel(context.Background()) + ret := &xdsclientWrapper{ + ctx: ctx, + cancel: cancel, + cleanup: exitCleanup, + } + + // TODO: get xdsclient from Attributes instead of creating a new one. + + config := bootstrapConfigNew() + if config.BalancerName == "" { + config.BalancerName = balancerName + } + if config.Creds == nil { + // TODO: Once we start supporting a mechanism to register credential + // types, a failure to find the credential type mentioned in the + // bootstrap file should result in a failure, and not in using + // credentials from the parent channel (passed through the + // resolver.BuildOptions). + config.Creds = defaultDialCreds(config.BalancerName, bbo) + } + + var dopts []grpc.DialOption + if bbo.Dialer != nil { + dopts = []grpc.DialOption{grpc.WithContextDialer(bbo.Dialer)} + } + + c, err := xdsclientNew(xdsclient.Options{Config: *config, DialOpts: dopts}) + if err != nil { + grpclog.Warningf("failed to create xdsclient, error: %v", err) + return ret + } + ret.xdsclient = c + + // The clusterName to watch should come from CDS response, via service + // config. If it's an empty string, fallback user's dial target. + nameToWatch := edsServiceName + if nameToWatch == "" { + grpclog.Warningf("eds: cluster name to watch is an empty string. Fallback to user's dial target") + nameToWatch = bbo.Target.Endpoint + } + ret.cancelWatch = ret.xdsclient.WatchEDS(nameToWatch, func(update *xdsclient.EDSUpdate, err error) { + if err != nil { + loseContact(ret.ctx) + return + } + if err := newADS(ret.ctx, update); err != nil { + grpclog.Warningf("xds: processing new EDS update failed due to %v.", err) + } + }) + if loadStore != nil { + ret.cancelLoadReport = ret.xdsclient.ReportLoad(loadReportServer, nameToWatch, loadStore) + } + return ret +} + +// defaultDialCreds builds a DialOption containing the credentials to be used +// while talking to the xDS server (this is done only if the xds bootstrap +// process does not return any credentials to use). If the parent channel +// contains DialCreds, we use it as is. If it contains a CredsBundle, we use +// just the transport credentials from the bundle. If we don't find any +// credentials on the parent channel, we resort to using an insecure channel. +func defaultDialCreds(balancerName string, rbo balancer.BuildOptions) grpc.DialOption { + switch { + case rbo.DialCreds != nil: + if err := rbo.DialCreds.OverrideServerName(balancerName); err != nil { + grpclog.Warningf("xds: failed to override server name in credentials: %v, using Insecure", err) + return grpc.WithInsecure() + } + return grpc.WithTransportCredentials(rbo.DialCreds) + case rbo.CredsBundle != nil: + return grpc.WithTransportCredentials(rbo.CredsBundle.TransportCredentials()) + default: + grpclog.Warning("xds: no credentials available, using Insecure") + return grpc.WithInsecure() + } +} diff --git a/xds/internal/balancer/xds_lrs_test.go b/xds/internal/balancer/xds_lrs_test.go index 543f6728..89a99cd5 100644 --- a/xds/internal/balancer/xds_lrs_test.go +++ b/xds/internal/balancer/xds_lrs_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" @@ -50,13 +51,14 @@ func (lrss *lrsServer) StreamLoadStats(stream lrsgrpc.LoadReportingService_Strea } if !proto.Equal(req, &lrspb.LoadStatsRequest{ ClusterStats: []*endpointpb.ClusterStats{{ - ClusterName: testServiceName, + ClusterName: testEDSClusterName, }}, + Node: &corepb.Node{}, }) { return status.Errorf(codes.FailedPrecondition, "unexpected req: %+v", req) } if err := stream.Send(&lrspb.LoadStatsResponse{ - Clusters: []string{testServiceName}, + Clusters: []string{testEDSClusterName}, LoadReportingInterval: lrss.reportingInterval, }); err != nil { return err @@ -87,11 +89,11 @@ func (s) TestXdsLoadReporting(t *testing.T) { newEDSBalancer = originalNewEDSBalancer }() - builder := balancer.Get(xdsName) + builder := balancer.Get(edsName) cc := newTestClientConn() - lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer) + lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) if !ok { - t.Fatalf("unable to type assert to *xdsBalancer") + t.Fatalf("unable to type assert to *edsBalancer") } defer lb.Close() diff --git a/xds/internal/balancer/xds_old.go b/xds/internal/balancer/xds_old.go new file mode 100644 index 00000000..1d371bd4 --- /dev/null +++ b/xds/internal/balancer/xds_old.go @@ -0,0 +1,46 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package balancer + +import "google.golang.org/grpc/balancer" + +// The old xds balancer implements logic for both CDS and EDS. With the new +// design, CDS is split and moved to a separate balancer, and the xds balancer +// becomes the EDS balancer. +// +// To keep the existing tests working, this file regisger EDS balancer under the +// old xds balancer name. +// +// TODO: delete this file when migration to new workflow (LDS, RDS, CDS, EDS) is +// done. + +const xdsName = "xds_experimental" + +func init() { + balancer.Register(&xdsBalancerBuilder{}) +} + +// xdsBalancerBuilder register edsBalancerBuilder (now with name +// "experimental_eds") under the old name "xds_experimental". +type xdsBalancerBuilder struct { + edsBalancerBuilder +} + +func (b *xdsBalancerBuilder) Name() string { + return xdsName +} diff --git a/xds/internal/balancer/xds_test.go b/xds/internal/balancer/xds_test.go index 08ca16d0..bc093e26 100644 --- a/xds/internal/balancer/xds_test.go +++ b/xds/internal/balancer/xds_test.go @@ -21,15 +21,18 @@ package balancer import ( "bytes" "encoding/json" + "fmt" "reflect" "sync" "testing" "time" xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" "github.com/golang/protobuf/jsonpb" wrapperspb "github.com/golang/protobuf/ptypes/wrappers" "github.com/google/go-cmp/cmp" + "google.golang.org/grpc" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/grpctest" @@ -39,14 +42,23 @@ import ( "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/lrs" xdsclient "google.golang.org/grpc/xds/internal/client" + "google.golang.org/grpc/xds/internal/client/bootstrap" ) var lbABuilder = &balancerABuilder{} func init() { - balancer.Register(&xdsBalancerBuilder{}) + balancer.Register(&edsBalancerBuilder{}) balancer.Register(lbABuilder) balancer.Register(&balancerBBuilder{}) + + bootstrapConfigNew = func() *bootstrap.Config { + return &bootstrap.Config{ + BalancerName: "", + Creds: grpc.WithInsecure(), + NodeProto: &corepb.Node{}, + } + } } type s struct{} @@ -70,6 +82,7 @@ var ( BalancerName: testBalancerNameFooBar, ChildPolicy: &loadBalancingConfig{Name: fakeBalancerB}, FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, + EDSServiceName: testEDSClusterName, } specialAddrForBalancerA = resolver.Address{Addr: "this.is.balancer.A"} @@ -233,6 +246,40 @@ type fakeSubConn struct{} func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") } func (*fakeSubConn) Connect() { panic("implement me") } +type fakeXDSClient struct { + edsCbReceived chan struct{} // Will be closed when WatchEDS is called. + edsCb func(*xdsclient.EDSUpdate, error) +} + +func newFakeXDSClient() *fakeXDSClient { + return &fakeXDSClient{edsCbReceived: make(chan struct{})} +} + +func (c *fakeXDSClient) WatchEDS(clusterName string, edsCb func(*xdsclient.EDSUpdate, error)) (cancel func()) { + c.edsCb = edsCb + // WatchEDS is expected to be only called once in the test. If a test needs + // to call it multiple times, this will panic. + close(c.edsCbReceived) + return func() {} +} + +func (c *fakeXDSClient) callEDSCallback(u *xdsclient.EDSUpdate, err error) { + t := time.NewTimer(time.Second) + select { + case <-c.edsCbReceived: + t.Stop() + case <-t.C: + panic("EDS callback is not received after 1 second") + } + c.edsCb(u, err) +} + +func (c *fakeXDSClient) ReportLoad(server string, clusterName string, loadStore lrs.Store) (cancel func()) { + return func() {} +} + +func (c *fakeXDSClient) Close() {} + // TestXdsFallbackResolvedAddrs verifies that the fallback balancer specified // in the provided lbconfig is initialized, and that it receives the addresses // pushed by the resolver. @@ -252,12 +299,12 @@ func (s) TestXdsFallbackResolvedAddrs(t *testing.T) { startupTimeout = 500 * time.Millisecond defer func() { startupTimeout = defaultTimeout }() - builder := balancer.Get(xdsName) + builder := balancer.Get(edsName) cc := newTestClientConn() b := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}) - lb, ok := b.(*xdsBalancer) + lb, ok := b.(*edsBalancer) if !ok { - t.Fatalf("builder.Build() returned a balancer of type %T, want *xdsBalancer", b) + t.Fatalf("builder.Build() returned a balancer of type %T, want *edsBalancer", b) } defer lb.Close() @@ -300,11 +347,11 @@ func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) { newEDSBalancer = originalNewEDSBalancer }() - builder := balancer.Get(xdsName) + builder := balancer.Get(edsName) cc := newTestClientConn() - lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer) + lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer) if !ok { - t.Fatalf("unable to type assert to *xdsBalancer") + t.Fatalf("unable to type assert to *edsBalancer") } defer lb.Close() addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} @@ -338,6 +385,7 @@ func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) { BalancerName: addr, ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, + EDSServiceName: testEDSClusterName, } lb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: addrs}, @@ -378,11 +426,11 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) { newEDSBalancer = originalNewEDSBalancer }() - builder := balancer.Get(xdsName) + builder := balancer.Get(edsName) cc := newTestClientConn() - lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer) + lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer) if !ok { - t.Fatalf("unable to type assert to *xdsBalancer") + t.Fatalf("unable to type assert to *edsBalancer") } defer lb.Close() @@ -403,6 +451,7 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) { Name: fakeBalancerA, Config: json.RawMessage("{}"), }, + EDSServiceName: testEDSClusterName, }, responseToSend: testEDSResp, expectedChildPolicy: &loadBalancingConfig{ @@ -416,6 +465,7 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) { Name: fakeBalancerB, Config: json.RawMessage("{}"), }, + EDSServiceName: testEDSClusterName, }, expectedChildPolicy: &loadBalancingConfig{ Name: string(fakeBalancerB), @@ -461,18 +511,25 @@ func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) { newEDSBalancer = originalNewEDSBalancer }() - builder := balancer.Get(xdsName) + testXDSClient := newFakeXDSClient() + originalxdsclientNew := xdsclientNew + xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) { + return testXDSClient, nil + } + defer func() { + xdsclientNew = originalxdsclientNew + }() + + builder := balancer.Get(edsName) cc := newTestClientConn() - lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer) + lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) if !ok { - t.Fatalf("unable to type assert to *xdsBalancer") + t.Fatalf("unable to type assert to *edsBalancer") } defer lb.Close() - addr, td, _, cleanup := setupServer(t) - cfg := XDSConfig{ - BalancerName: addr, + BalancerName: "", ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, } @@ -486,7 +543,9 @@ func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) { BalancerConfig: &cfg2, }) - td.sendResp(&response{resp: testEDSResp}) + // Callback with an EDS update, the balancer will build a EDS balancer, not + // a fallback. + testXDSClient.callEDSCallback(xdsclient.ParseEDSRespProtoForTesting(testClusterLoadAssignment), nil) var i int for i = 0; i < 10; i++ { @@ -499,7 +558,8 @@ func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) { t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s") } - cleanup() + // Callback with an error, the balancer should switch to fallback. + testXDSClient.callEDSCallback(nil, fmt.Errorf("xds client error")) // verify fallback balancer B takes over select { @@ -536,24 +596,34 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) { newEDSBalancer = originalNewEDSBalancer }() - builder := balancer.Get(xdsName) + testXDSClient := newFakeXDSClient() + originalxdsclientNew := xdsclientNew + xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) { + return testXDSClient, nil + } + defer func() { + xdsclientNew = originalxdsclientNew + }() + + builder := balancer.Get(edsName) cc := newTestClientConn() - lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer) + lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) if !ok { - t.Fatalf("unable to type assert to *xdsBalancer") + t.Fatalf("unable to type assert to *edsBalancer") } defer lb.Close() - addr, td, _, cleanup := setupServer(t) - defer cleanup() cfg := &XDSConfig{ - BalancerName: addr, + BalancerName: "", ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, + EDSServiceName: testEDSClusterName, } lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg}) - td.sendResp(&response{resp: testEDSResp}) + // Callback with an EDS update, the balancer will build a EDS balancer, not + // a fallback. + testXDSClient.callEDSCallback(xdsclient.ParseEDSRespProtoForTesting(testClusterLoadAssignment), nil) expectedScStateChange := &scStateChange{ sc: &fakeSubConn{}, @@ -583,7 +653,8 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) { // lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record // to make sure there's a new one created and get the pointer to it. lbABuilder.clearLastBalancer() - cleanup() + // Callback with an error, the balancer should switch to fallback. + testXDSClient.callEDSCallback(nil, fmt.Errorf("xds client error")) // switch to fallback // fallback balancer A takes over @@ -614,24 +685,33 @@ func (s) TestXdsBalancerFallBackSignalFromEdsBalancer(t *testing.T) { newEDSBalancer = originalNewEDSBalancer }() - builder := balancer.Get(xdsName) + testXDSClient := newFakeXDSClient() + originalxdsclientNew := xdsclientNew + xdsclientNew = func(opts xdsclient.Options) (xdsClientInterface, error) { + return testXDSClient, nil + } + defer func() { + xdsclientNew = originalxdsclientNew + }() + + builder := balancer.Get(edsName) cc := newTestClientConn() - lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*xdsBalancer) + lb, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) if !ok { - t.Fatalf("unable to type assert to *xdsBalancer") + t.Fatalf("unable to type assert to *edsBalancer") } defer lb.Close() - addr, td, _, cleanup := setupServer(t) - defer cleanup() cfg := &XDSConfig{ - BalancerName: addr, + BalancerName: "", ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, } lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg}) - td.sendResp(&response{resp: testEDSResp}) + // Callback with an EDS update, the balancer will build a EDS balancer, not + // a fallback. + testXDSClient.callEDSCallback(xdsclient.ParseEDSRespProtoForTesting(testClusterLoadAssignment), nil) expectedScStateChange := &scStateChange{ sc: &fakeSubConn{}, @@ -661,7 +741,8 @@ func (s) TestXdsBalancerFallBackSignalFromEdsBalancer(t *testing.T) { // lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record // to make sure there's a new one created and get the pointer to it. lbABuilder.clearLastBalancer() - cleanup() + // Callback with an error, the balancer should switch to fallback. + testXDSClient.callEDSCallback(nil, fmt.Errorf("xds client error")) // switch to fallback // fallback balancer A takes over @@ -770,10 +851,10 @@ func TestXdsBalancerConfigParsing(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - b := &xdsBalancerBuilder{} + b := &edsBalancerBuilder{} got, err := b.ParseConfig(tt.js) if (err != nil) != tt.wantErr { - t.Errorf("xdsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("edsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr) return } if !cmp.Equal(got, tt.want) { diff --git a/xds/internal/client/client.go b/xds/internal/client/client.go index b94ce751..5776fbdf 100644 --- a/xds/internal/client/client.go +++ b/xds/internal/client/client.go @@ -46,8 +46,9 @@ type Options struct { // various dynamic resources. A single client object will be shared by the xds // resolver and balancer implementations. type Client struct { - cc *grpc.ClientConn // Connection to the xDS server - v2c *v2Client // Actual xDS client implementation using the v2 API + opts Options + cc *grpc.ClientConn // Connection to the xDS server + v2c *v2Client // Actual xDS client implementation using the v2 API mu sync.Mutex serviceCallback func(ServiceUpdate, error) @@ -74,8 +75,9 @@ func New(opts Options) (*Client, error) { } c := &Client{ - cc: cc, - v2c: newV2Client(cc, opts.Config.NodeProto, backoff.DefaultExponential.Backoff), + opts: opts, + cc: cc, + v2c: newV2Client(cc, opts.Config.NodeProto, backoff.DefaultExponential.Backoff), } return c, nil } @@ -151,3 +153,8 @@ func (c *Client) WatchService(serviceName string, callback func(ServiceUpdate, e c.mu.Unlock() } } + +// WatchEDS watches the ghost. +func (c *Client) WatchEDS(clusterName string, edsCb func(*EDSUpdate, error)) (cancel func()) { + return c.v2c.watchEDS(clusterName, edsCb) +} diff --git a/xds/internal/client/client_loadreport.go b/xds/internal/client/client_loadreport.go new file mode 100644 index 00000000..09313ba2 --- /dev/null +++ b/xds/internal/client/client_loadreport.go @@ -0,0 +1,63 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/xds/internal/balancer/lrs" +) + +// ReportLoad sends the load of the given clusterName from loadStore to the +// given server. If the server is not an empty string, and is different from the +// xds server, a new ClientConn will be created. +// +// The same options used for creating the Client will be used (including +// NodeProto, and dial options if necessary). +// +// It returns a function to cancel the load reporting stream. If server is +// different from xds server, the ClientConn will also be closed. +func (c *Client) ReportLoad(server string, clusterName string, loadStore lrs.Store) func() { + var ( + cc *grpc.ClientConn + closeCC bool + ) + if server == "" || server == c.opts.Config.BalancerName { + cc = c.cc + } else { + dopts := append([]grpc.DialOption{c.opts.Config.Creds}, c.opts.DialOpts...) + ccNew, err := grpc.Dial(server, dopts...) + if err != nil { + // An error from a non-blocking dial indicates something serious. + grpclog.Infof("xds: failed to dial load report server {%s}: %v", server, err) + return func() {} + } + cc = ccNew + closeCC = true + } + ctx, cancel := context.WithCancel(context.Background()) + go loadStore.ReportTo(ctx, c.cc, clusterName, c.opts.Config.NodeProto) + return func() { + cancel() + if closeCC { + cc.Close() + } + } +} diff --git a/xds/internal/client/eds.go b/xds/internal/client/eds.go index 26ca1f8a..9f7f5722 100644 --- a/xds/internal/client/eds.go +++ b/xds/internal/client/eds.go @@ -204,6 +204,7 @@ func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error { } if cla.GetClusterName() != wi.target[0] { + grpclog.Warningf("xds: got uninteresting EDS resource, got %s, want %s", cla.GetClusterName(), wi.target[0]) // We won't validate the remaining resources. If one of the // uninteresting ones is invalid, we will still ACK the response. continue