From d8e3da36ac481ef00e510ca119f6b68177713689 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 25 Nov 2019 14:50:28 -0800 Subject: [PATCH] xds: in EDS balancer, use xds_client from attibutes (#3205) --- xds/internal/balancer/xds.go | 172 ++++---------- xds/internal/balancer/xds_client_test.go | 217 +++++++++++++++-- xds/internal/balancer/xds_client_wrapper.go | 244 +++++++++++++++----- xds/internal/balancer/xds_test.go | 27 ++- 4 files changed, 447 insertions(+), 213 deletions(-) diff --git a/xds/internal/balancer/xds.go b/xds/internal/balancer/xds.go index b3055784..5cd46daf 100644 --- a/xds/internal/balancer/xds.go +++ b/xds/internal/balancer/xds.go @@ -28,6 +28,7 @@ import ( "time" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/resolver" @@ -77,6 +78,7 @@ func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp updateState: x.connStateMgr.updateState, ClientConn: cc, } + x.client = newXDSClientWrapper(x.handleEDSUpdate, x.loseContact, x.buildOpts, x.loadStore) go x.run() return x } @@ -137,72 +139,6 @@ type edsBalancer struct { loadStore lrs.Store } -// TODO: cleanup this function, or just remove it. It was here because the xds -// server name from service config can change, and we need to migrate from the -// old one to the new one. Now the xds server name is specified by the bootstrap -// file, and should never change. There's no need for this. -func (x *edsBalancer) startNewXDSClient(u *XDSConfig) { - // If the edsBalancer is in startup stage, then we need to apply the startup timeout for the first - // xdsClient to get a response from the traffic director. - if x.startup { - x.startFallbackMonitoring() - } - - // Whenever service config gives a new traffic director name, we need to create an xds client to - // connect to it. However, previous xds client should not be closed until the new one successfully - // connects to the traffic director (i.e. get an ADS response from the traffic director). Therefore, - // we let each new client to be responsible to close its immediate predecessor. In this way, - // edsBalancer does not to implement complex synchronization to achieve the same purpose. - prevClient := x.client - // haveGotADS is true means, this xdsClient has got ADS response from director in the past, which - // means it can close previous client if it hasn't and it now can send lose contact signal for - // fallback monitoring. - var haveGotADS bool - - // set up callbacks for the xds client. - newADS := func(ctx context.Context, resp *xdsclient.EDSUpdate) error { - if !haveGotADS { - if prevClient != nil { - prevClient.close() - } - haveGotADS = true - } - return x.newADSResponse(ctx, resp) - } - loseContact := func(ctx context.Context) { - // loseContact signal is only useful when the current xds client has received ADS response before, - // and has not been closed by later xds client. - if haveGotADS { - select { - case <-ctx.Done(): - return - default: - } - x.loseContact(ctx) - } - } - exitCleanup := func() { - // Each xds client is responsible to close its predecessor if there's one. There are two paths - // for a xds client to close its predecessor: - // 1. Once it receives its first ADS response. - // 2. It hasn't received its first ADS response yet, but its own successor has received ADS - // response (which triggers the exit of it). Therefore, it needs to close its predecessor if - // it has one. - // Here the exitCleanup is for the 2nd path. - if !haveGotADS && prevClient != nil { - prevClient.close() - } - } - // TODO: handle cfg.LrsLoadReportingServerName and remove log. - if u.LrsLoadReportingServerName != "" { - grpclog.Warningf("xds: lrsLoadReportingServerName is not empty, but is not handled") - } - - // TODO: stop using u.BalancerName. The value should come from bootstrap - // file. It's only used in tests now. - x.client = newXDSClientWrapper(u.BalancerName, u.EDSServiceName, x.buildOpts, u.LrsLoadReportingServerName, x.loadStore, newADS, loseContact, exitCleanup) -} - // run gets executed in a goroutine once edsBalancer is created. It monitors updates from grpc, // xdsClient and load balancer. It synchronizes the operations that happen inside edsBalancer. It // exits when edsBalancer is closed. @@ -252,44 +188,39 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) { return } + x.client.handleUpdate(cfg, u.ResolverState.Attributes) + + if x.config == nil { + // If this is the first config, the edsBalancer is in startup stage. + // We need to apply the startup timeout for the first xdsClient, in + // case it doesn't get a response from the traffic director in time. + if x.startup { + x.startFallbackMonitoring() + } + x.config = cfg + x.fallbackInitData = &resolver.State{ + Addresses: u.ResolverState.Addresses, + // TODO(yuxuanli): get the fallback balancer config once the validation change completes, where + // we can pass along the config struct. + } + return + } + + // We will update the xdsLB with the new child policy, if we got a + // different one. + if x.xdsLB != nil && !reflect.DeepEqual(cfg.ChildPolicy, x.config.ChildPolicy) { + if cfg.ChildPolicy != nil { + x.xdsLB.HandleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config) + } else { + x.xdsLB.HandleChildPolicy(roundrobin.Name, nil) + } + } + var fallbackChanged bool - // service config has been updated. - if !reflect.DeepEqual(cfg, x.config) { - if x.config == nil { - // The first time we get config, we just need to start the xdsClient. - x.startNewXDSClient(cfg) - x.config = cfg - x.fallbackInitData = &resolver.State{ - Addresses: u.ResolverState.Addresses, - // TODO(yuxuanli): get the fallback balancer config once the validation change completes, where - // we can pass along the config struct. - } - return - } - - // Create a different xds_client if part of the config is different. - // - // TODO: this and all client related code, including comparing new - // config with old, creating new client, should be moved to a - // dedicated struct, and handled together. - if cfg.BalancerName != x.config.BalancerName || cfg.EDSServiceName != x.config.EDSServiceName || cfg.LrsLoadReportingServerName != x.config.LrsLoadReportingServerName { - x.startNewXDSClient(cfg) - } - // We will update the xdsLB with the new child policy, if we got a - // different one. - if x.xdsLB != nil && !reflect.DeepEqual(cfg.ChildPolicy, x.config.ChildPolicy) { - if cfg.ChildPolicy != nil { - x.xdsLB.HandleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config) - } else { - x.xdsLB.HandleChildPolicy("round_robin", cfg.ChildPolicy.Config) - } - } - - if x.fallbackLB != nil && !reflect.DeepEqual(cfg.FallBackPolicy, x.config.FallBackPolicy) { - x.fallbackLB.Close() - x.buildFallBackBalancer(cfg) - fallbackChanged = true - } + if x.fallbackLB != nil && !reflect.DeepEqual(cfg.FallBackPolicy, x.config.FallBackPolicy) { + x.fallbackLB.Close() + x.buildFallBackBalancer(cfg) + fallbackChanged = true } if x.fallbackLB != nil && (!reflect.DeepEqual(x.fallbackInitData.Addresses, u.ResolverState.Addresses) || fallbackChanged) { @@ -312,20 +243,12 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) { func (x *edsBalancer) handleXDSClientUpdate(update interface{}) { switch u := update.(type) { - case *edsResp: - select { - case <-u.ctx.Done(): - return - default: - } + // TODO: this func should accept (*xdsclient.EDSUpdate, error), and process + // the error, instead of having a separate loseContact signal. + case *xdsclient.EDSUpdate: x.cancelFallbackAndSwitchEDSBalancerIfNecessary() - x.xdsLB.HandleEDSResponse(u.resp) + x.xdsLB.HandleEDSResponse(u) case *loseContact: - select { - case <-u.ctx.Done(): - return - default: - } // if we are already doing fallback monitoring, then we ignore new loseContact signal. if x.inFallbackMonitor { return @@ -418,30 +341,25 @@ func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error { return nil } -type edsResp struct { - ctx context.Context - resp *xdsclient.EDSUpdate -} - -func (x *edsBalancer) newADSResponse(ctx context.Context, resp *xdsclient.EDSUpdate) error { +func (x *edsBalancer) handleEDSUpdate(resp *xdsclient.EDSUpdate) error { + // TODO: this function should take (resp, error), and send them together on + // the channel. There doesn't need to be a separate `loseContact` function. select { - case x.xdsClientUpdate <- &edsResp{ctx: ctx, resp: resp}: + case x.xdsClientUpdate <- resp: case <-x.ctx.Done(): - case <-ctx.Done(): } return nil } type loseContact struct { - ctx context.Context } -func (x *edsBalancer) loseContact(ctx context.Context) { +// TODO: delete loseContact when handleEDSUpdate takes (resp, error). +func (x *edsBalancer) loseContact() { select { - case x.xdsClientUpdate <- &loseContact{ctx: ctx}: + case x.xdsClientUpdate <- &loseContact{}: case <-x.ctx.Done(): - case <-ctx.Done(): } } diff --git a/xds/internal/balancer/xds_client_test.go b/xds/internal/balancer/xds_client_test.go index 22a053c7..75b18129 100644 --- a/xds/internal/balancer/xds_client_test.go +++ b/xds/internal/balancer/xds_client_test.go @@ -19,8 +19,6 @@ package balancer import ( - "context" - "io" "net" "testing" @@ -36,11 +34,14 @@ import ( wrpb "github.com/golang/protobuf/ptypes/wrappers" "github.com/google/go-cmp/cmp" "google.golang.org/grpc" + "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/resolver" "google.golang.org/grpc/status" + xdsinternal "google.golang.org/grpc/xds/internal" xdsclient "google.golang.org/grpc/xds/internal/client" + "google.golang.org/grpc/xds/internal/client/bootstrap" ) const ( @@ -107,6 +108,8 @@ var ( } ) +// TODO: remove all usage of testTrafficDirector, and use fakexds server +// instead. type testTrafficDirector struct { reqChan chan *request respChan chan *response @@ -123,26 +126,22 @@ type response struct { } func (ttd *testTrafficDirector) StreamAggregatedResources(s xdsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { - for { - req, err := s.Recv() - if err != nil { + go func() { + for { + req, err := s.Recv() + if err != nil { + ttd.reqChan <- &request{ + req: nil, + err: err, + } + return + } ttd.reqChan <- &request{ - req: nil, - err: err, + req: req, + err: nil, } - if err == io.EOF { - return nil - } - return err } - ttd.reqChan <- &request{ - req: req, - err: nil, - } - if req.TypeUrl == edsType { - break - } - } + }() for { select { @@ -237,12 +236,17 @@ func testXdsClientResponseHandling(t *testing.T, test *testConfig) { addr, td, _, cleanup := setupServer(t) defer cleanup() adsChan := make(chan *xdsclient.EDSUpdate, 10) - newADS := func(ctx context.Context, i *xdsclient.EDSUpdate) error { + newADS := func(i *xdsclient.EDSUpdate) error { adsChan <- i return nil } - client := newXDSClientWrapper(addr, test.edsServiceName, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, "", nil, newADS, func(context.Context) {}, func() {}) + client := newXDSClientWrapper(newADS, func() {}, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil) defer client.close() + client.handleUpdate(&XDSConfig{ + BalancerName: addr, + EDSServiceName: test.edsServiceName, + LrsLoadReportingServerName: "", + }, nil) for _, expectedReq := range test.expectedRequests { req := td.getReq() @@ -266,3 +270,174 @@ func testXdsClientResponseHandling(t *testing.T, test *testConfig) { } } } + +// Test that if xds_client is in attributes, the xdsclientnew function will not +// be called, and the xds_client from attributes will be used. +// +// And also that when xds_client in attributes is updated, the new one will be +// used, and watch will be restarted. +func (s) TestXdsClientInAttributes(t *testing.T) { + adsChan := make(chan *xdsclient.EDSUpdate, 10) + newADS := func(i *xdsclient.EDSUpdate) error { + adsChan <- i + return nil + } + + oldxdsclientNew := xdsclientNew + xdsclientNew = func(opts xdsclient.Options) (clientInterface xdsClientInterface, e error) { + t.Fatalf("unexpected call to xdsclientNew when xds_client is set in attributes") + return nil, nil + } + defer func() { xdsclientNew = oldxdsclientNew }() + + addr, td, _, cleanup := setupServer(t) + defer cleanup() + // Create a client to be passed in attributes. + c, _ := oldxdsclientNew(xdsclient.Options{ + Config: bootstrap.Config{ + BalancerName: addr, + Creds: grpc.WithInsecure(), + NodeProto: &corepb.Node{}, + }, + }) + // Need to manually close c because xdsclientWrapper won't close it (it's + // from attributes). + defer c.Close() + + client := newXDSClientWrapper(newADS, func() {}, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil) + defer client.close() + + client.handleUpdate( + &XDSConfig{EDSServiceName: testEDSClusterName, LrsLoadReportingServerName: ""}, + attributes.New(xdsinternal.XDSClientID, c), + ) + + expectedReq := &xdspb.DiscoveryRequest{ + TypeUrl: edsType, + ResourceNames: []string{testEDSClusterName}, + Node: &corepb.Node{}, + } + + // Make sure the requests are sent to the correct td. + req := td.getReq() + if req.err != nil { + t.Fatalf("ads RPC failed with err: %v", req.err) + } + if !proto.Equal(req.req, expectedReq) { + t.Fatalf("got ADS request %T, expected: %T, diff: %s", req.req, expectedReq, cmp.Diff(req.req, expectedReq, cmp.Comparer(proto.Equal))) + } + + addr2, td2, _, cleanup2 := setupServer(t) + defer cleanup2() + // Create a client to be passed in attributes. + c2, _ := oldxdsclientNew(xdsclient.Options{ + Config: bootstrap.Config{ + BalancerName: addr2, + Creds: grpc.WithInsecure(), + NodeProto: &corepb.Node{}, + }, + }) + // Need to manually close c because xdsclientWrapper won't close it (it's + // from attributes). + defer c2.Close() + + // Update with a new xds_client in attributes. + client.handleUpdate( + &XDSConfig{EDSServiceName: "", LrsLoadReportingServerName: ""}, + attributes.New(xdsinternal.XDSClientID, c2), + ) + + expectedReq2 := &xdspb.DiscoveryRequest{ + TypeUrl: edsType, + // The edsServiceName in new update is an empty string, user's dial + // target should be used as eds name to watch. + ResourceNames: []string{testServiceName}, + Node: &corepb.Node{}, + } + + // Make sure the requests are sent to the correct td. + req2 := td2.getReq() + if req.err != nil { + t.Fatalf("ads RPC failed with err: %v", req.err) + } + if !proto.Equal(req2.req, expectedReq2) { + t.Fatalf("got ADS request %T, expected: %T, diff: %s", req2.req, expectedReq, cmp.Diff(req2.req, expectedReq2, cmp.Comparer(proto.Equal))) + } +} + +// Test that when edsServiceName from service config is updated, the new one +// will be watched. +func (s) TestEDSServiceNameUpdate(t *testing.T) { + adsChan := make(chan *xdsclient.EDSUpdate, 10) + newADS := func(i *xdsclient.EDSUpdate) error { + adsChan <- i + return nil + } + + oldxdsclientNew := xdsclientNew + xdsclientNew = func(opts xdsclient.Options) (clientInterface xdsClientInterface, e error) { + t.Fatalf("unexpected call to xdsclientNew when xds_client is set in attributes") + return nil, nil + } + defer func() { xdsclientNew = oldxdsclientNew }() + + addr, td, _, cleanup := setupServer(t) + defer cleanup() + // Create a client to be passed in attributes. + c, _ := oldxdsclientNew(xdsclient.Options{ + Config: bootstrap.Config{ + BalancerName: addr, + Creds: grpc.WithInsecure(), + NodeProto: &corepb.Node{}, + }, + }) + // Need to manually close c because xdsclientWrapper won't close it (it's + // from attributes). + defer c.Close() + + client := newXDSClientWrapper(newADS, func() {}, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil) + defer client.close() + + client.handleUpdate( + &XDSConfig{EDSServiceName: testEDSClusterName, LrsLoadReportingServerName: ""}, + attributes.New(xdsinternal.XDSClientID, c), + ) + + expectedReq := &xdspb.DiscoveryRequest{ + TypeUrl: edsType, + ResourceNames: []string{testEDSClusterName}, + Node: &corepb.Node{}, + } + + // Make sure the requests are sent to the correct td. + req := td.getReq() + if req.err != nil { + t.Fatalf("ads RPC failed with err: %v", req.err) + } + if !proto.Equal(req.req, expectedReq) { + t.Fatalf("got ADS request %T, expected: %T, diff: %s", req.req, expectedReq, cmp.Diff(req.req, expectedReq, cmp.Comparer(proto.Equal))) + } + + // Update with a new edsServiceName. + client.handleUpdate( + &XDSConfig{EDSServiceName: "", LrsLoadReportingServerName: ""}, + attributes.New(xdsinternal.XDSClientID, c), + ) + + expectedReq2 := &xdspb.DiscoveryRequest{ + TypeUrl: edsType, + // The edsServiceName in new update is an empty string, user's dial + // target should be used as eds name to watch. + ResourceNames: []string{testServiceName}, + Node: &corepb.Node{}, + } + + // Make sure the requests are sent to the correct td. + req2 := td.getReq() + if req.err != nil { + t.Fatalf("ads RPC failed with err: %v", req.err) + } + if !proto.Equal(req2.req, expectedReq2) { + t.Fatalf("got ADS request %T, expected: %T, diff: %s", req2.req, expectedReq, cmp.Diff(req2.req, expectedReq2, cmp.Comparer(proto.Equal))) + } +} diff --git a/xds/internal/balancer/xds_client_wrapper.go b/xds/internal/balancer/xds_client_wrapper.go index 46fb1b8f..39a097b9 100644 --- a/xds/internal/balancer/xds_client_wrapper.go +++ b/xds/internal/balancer/xds_client_wrapper.go @@ -19,11 +19,11 @@ package balancer import ( - "context" - "google.golang.org/grpc" + "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" "google.golang.org/grpc/grpclog" + xdsinternal "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/balancer/lrs" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/bootstrap" @@ -44,88 +44,224 @@ var ( bootstrapConfigNew = bootstrap.NewConfig ) -// xdsclientWpapper is responsible for getting the xds client from attributes or +// xdsclientWrapper is responsible for getting the xds client from attributes or // creating a new xds client, and start watching EDS. The given callbacks will // be called with EDS updates or errors. type xdsclientWrapper struct { - ctx context.Context - cancel context.CancelFunc - cancelWatch func() - cancelLoadReport func() - cleanup func() + newEDSUpdate func(*xdsclient.EDSUpdate) error + loseContact func() + bbo balancer.BuildOptions + loadStore lrs.Store + balancerName string + // xdsclient could come from attributes, or created with balancerName. xdsclient xdsClientInterface + + // edsServiceName is the edsServiceName currently being watched, not + // necessary the edsServiceName from service config. + // + // If edsServiceName from service config is an empty, this will be user's + // dial target (because that's what we use to watch EDS). + // + // TODO: remove the empty string related behavior, when we switch to always + // do CDS. + edsServiceName string + cancelEDSWatch func() + loadReportServer string + cancelLoadReport func() } -func (c *xdsclientWrapper) close() { - if c.cancelLoadReport != nil { - c.cancelLoadReport() +// newXDSClientWrapper creates an empty xds_client wrapper that does nothing. It +// can accept xds_client configs, to new/switch xds_client to use. +// +// The given callbacks won't be called until the underlying xds_client is +// working and sends updates. +func newXDSClientWrapper(newEDSUpdate func(*xdsclient.EDSUpdate) error, loseContact func(), bbo balancer.BuildOptions, loadStore lrs.Store) *xdsclientWrapper { + return &xdsclientWrapper{ + newEDSUpdate: newEDSUpdate, + loseContact: loseContact, + bbo: bbo, + loadStore: loadStore, } - if c.cancelWatch != nil { - c.cancelWatch() - } - if c.xdsclient != nil { - // TODO: this shouldn't close xdsclient if it's from attributes. - c.xdsclient.Close() - } - c.cleanup() } -func newXDSClientWrapper(balancerName string, edsServiceName string, bbo balancer.BuildOptions, loadReportServer string, loadStore lrs.Store, newADS func(context.Context, *xdsclient.EDSUpdate) error, loseContact func(ctx context.Context), exitCleanup func()) *xdsclientWrapper { - ctx, cancel := context.WithCancel(context.Background()) - ret := &xdsclientWrapper{ - ctx: ctx, - cancel: cancel, - cleanup: exitCleanup, +// replaceXDSClient replaces xdsclient fields to the newClient if they are +// different. If xdsclient is replaced, the balancerName field will also be +// updated to newBalancerName. +// +// If the old xdsclient is replaced, and was created locally (not from +// attributes), it will be closed. +// +// It returns whether xdsclient is replaced. +func (c *xdsclientWrapper) replaceXDSClient(newClient xdsClientInterface, newBalancerName string) bool { + if c.xdsclient == newClient { + return false + } + oldClient := c.xdsclient + oldBalancerName := c.balancerName + c.xdsclient = newClient + c.balancerName = newBalancerName + if oldBalancerName != "" { + // OldBalancerName!="" means if the old client was not from attributes. + oldClient.Close() + } + return true +} + +// updateXDSClient sets xdsclient in wrapper to the correct one based on the +// attributes and service config. +// +// If client is found in attributes, it will be used, but we also need to decide +// whether to close the old client. +// - if old client was created locally (balancerName is not ""), close it and +// replace it +// - if old client was from previous attributes, only replace it, but don't +// close it +// +// If client is not found in attributes, will need to create a new one only if +// the balancerName (from bootstrap file or from service config) changed. +// - if balancer names are the same, do nothing, and return false +// - if balancer names are different, create new one, and return true +func (c *xdsclientWrapper) updateXDSClient(config *XDSConfig, attr *attributes.Attributes) bool { + if attr != nil { + if clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface); clientFromAttr != nil { + // This will also clear balancerName, to indicate that client is + // from attributes. + return c.replaceXDSClient(clientFromAttr, "") + } } - // TODO: get xdsclient from Attributes instead of creating a new one. - - config := bootstrapConfigNew() - if config.BalancerName == "" { - config.BalancerName = balancerName + clientConfig := bootstrapConfigNew() + if clientConfig.BalancerName == "" { + clientConfig.BalancerName = config.BalancerName } - if config.Creds == nil { + + if c.balancerName == clientConfig.BalancerName { + return false + } + + if clientConfig.Creds == nil { // TODO: Once we start supporting a mechanism to register credential // types, a failure to find the credential type mentioned in the // bootstrap file should result in a failure, and not in using // credentials from the parent channel (passed through the // resolver.BuildOptions). - config.Creds = defaultDialCreds(config.BalancerName, bbo) + clientConfig.Creds = defaultDialCreds(clientConfig.BalancerName, c.bbo) } - var dopts []grpc.DialOption - if bbo.Dialer != nil { - dopts = []grpc.DialOption{grpc.WithContextDialer(bbo.Dialer)} + if dialer := c.bbo.Dialer; dialer != nil { + dopts = []grpc.DialOption{grpc.WithContextDialer(dialer)} } - c, err := xdsclientNew(xdsclient.Options{Config: *config, DialOpts: dopts}) + newClient, err := xdsclientNew(xdsclient.Options{Config: *clientConfig, DialOpts: dopts}) if err != nil { - grpclog.Warningf("failed to create xdsclient, error: %v", err) - return ret + // This should never fail. xdsclientnew does a non-blocking dial, and + // all the config passed in should be validated. + // + // This could leave c.xdsclient as nil if this is the first update. + grpclog.Warningf("eds: failed to create xdsclient, error: %v", err) + return false } - ret.xdsclient = c + return c.replaceXDSClient(newClient, clientConfig.BalancerName) +} - // The clusterName to watch should come from CDS response, via service - // config. If it's an empty string, fallback user's dial target. - nameToWatch := edsServiceName - if nameToWatch == "" { - grpclog.Warningf("eds: cluster name to watch is an empty string. Fallback to user's dial target") - nameToWatch = bbo.Target.Endpoint +// startEDSWatch starts the EDS watch. Caller can call this when the xds_client +// is updated, or the edsServiceName is updated. +// +// Note that if there's already a watch in progress, it's not explicitly +// canceled. Because for each xds_client, there should be only one EDS watch in +// progress. So a new EDS watch implicitly cancels the previous one. +// +// This usually means load report needs to be restarted, but this function does +// NOT do that. Caller needs to call startLoadReport separately. +func (c *xdsclientWrapper) startEDSWatch(nameToWatch string) { + if c.xdsclient == nil { + grpclog.Warningf("xds: xdsclient is nil when trying to start an EDS watch. This means xdsclient wasn't passed in from the resolver, and xdsclient.New failed") + return } - ret.cancelWatch = ret.xdsclient.WatchEDS(nameToWatch, func(update *xdsclient.EDSUpdate, err error) { - if err != nil { - loseContact(ret.ctx) - return - } - if err := newADS(ret.ctx, update); err != nil { + + c.edsServiceName = nameToWatch + c.cancelEDSWatch = c.xdsclient.WatchEDS(c.edsServiceName, func(update *xdsclient.EDSUpdate, err error) { + // TODO: this should trigger a call to `c.loseContact`, when the error + // indicates "lose contact". + if err := c.newEDSUpdate(update); err != nil { grpclog.Warningf("xds: processing new EDS update failed due to %v.", err) } }) - if loadStore != nil { - ret.cancelLoadReport = ret.xdsclient.ReportLoad(loadReportServer, nameToWatch, loadStore) +} + +// startLoadReport starts load reporting. If there's already a load reporting in +// progress, it cancels that. +// +// Caller can cal this when the loadReportServer name changes, but +// edsServiceName doesn't (so we only need to restart load reporting, not EDS +// watch). +func (c *xdsclientWrapper) startLoadReport(edsServiceNameBeingWatched string, loadReportServer string) { + if c.xdsclient == nil { + grpclog.Warningf("xds: xdsclient is nil when trying to start load reporting. This means xdsclient wasn't passed in from the resolver, and xdsclient.New failed") + return + } + if c.loadStore != nil { + if c.cancelLoadReport != nil { + c.cancelLoadReport() + } + c.loadReportServer = loadReportServer + c.cancelLoadReport = c.xdsclient.ReportLoad(c.loadReportServer, edsServiceNameBeingWatched, c.loadStore) + } +} + +// handleUpdate applies the service config and attributes updates to the client, +// including updating the xds_client to use, and updating the EDS name to watch. +func (c *xdsclientWrapper) handleUpdate(config *XDSConfig, attr *attributes.Attributes) { + clientChanged := c.updateXDSClient(config, attr) + + var ( + restartWatchEDS bool + restartLoadReport bool + ) + + // The clusterName to watch should come from CDS response, via service + // config. If it's an empty string, fallback user's dial target. + nameToWatch := config.EDSServiceName + if nameToWatch == "" { + grpclog.Warningf("eds: cluster name to watch is an empty string. Fallback to user's dial target") + nameToWatch = c.bbo.Target.Endpoint + } + + // Need to restart EDS watch when one of the following happens: + // - the xds_client is updated + // - the xds_client didn't change, but the edsServiceName changed + // + // Only need to restart load reporting when: + // - no need to restart EDS, but loadReportServer name changed + if clientChanged || c.edsServiceName != nameToWatch { + restartWatchEDS = true + restartLoadReport = true + } else if c.loadReportServer != config.LrsLoadReportingServerName { + restartLoadReport = true + } + + if restartWatchEDS { + c.startEDSWatch(nameToWatch) + } + + if restartLoadReport { + c.startLoadReport(nameToWatch, config.LrsLoadReportingServerName) + } +} + +func (c *xdsclientWrapper) close() { + if c.xdsclient != nil && c.balancerName != "" { + // Only close xdsclient if it's not from attributes. + c.xdsclient.Close() + } + + if c.cancelLoadReport != nil { + c.cancelLoadReport() + } + if c.cancelEDSWatch != nil { + c.cancelEDSWatch() } - return ret } // defaultDialCreds builds a DialOption containing the credentials to be used diff --git a/xds/internal/balancer/xds_test.go b/xds/internal/balancer/xds_test.go index bc093e26..6da975b0 100644 --- a/xds/internal/balancer/xds_test.go +++ b/xds/internal/balancer/xds_test.go @@ -21,7 +21,6 @@ package balancer import ( "bytes" "encoding/json" - "fmt" "reflect" "sync" "testing" @@ -264,7 +263,7 @@ func (c *fakeXDSClient) WatchEDS(clusterName string, edsCb func(*xdsclient.EDSUp } func (c *fakeXDSClient) callEDSCallback(u *xdsclient.EDSUpdate, err error) { - t := time.NewTimer(time.Second) + t := time.NewTimer(1 * time.Second) select { case <-c.edsCbReceived: t.Stop() @@ -529,7 +528,7 @@ func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) { defer lb.Close() cfg := XDSConfig{ - BalancerName: "", + BalancerName: "wrong-balancer-name", ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, } @@ -558,8 +557,10 @@ func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) { t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s") } - // Callback with an error, the balancer should switch to fallback. - testXDSClient.callEDSCallback(nil, fmt.Errorf("xds client error")) + // Call loseContact explicitly, error in EDS callback is not handled. + // Eventually, this should call EDS ballback with an error that indicates + // "lost contact". + lb.loseContact() // verify fallback balancer B takes over select { @@ -614,7 +615,7 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) { defer lb.Close() cfg := &XDSConfig{ - BalancerName: "", + BalancerName: "wrong-balancer-name", ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, EDSServiceName: testEDSClusterName, @@ -653,8 +654,10 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) { // lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record // to make sure there's a new one created and get the pointer to it. lbABuilder.clearLastBalancer() - // Callback with an error, the balancer should switch to fallback. - testXDSClient.callEDSCallback(nil, fmt.Errorf("xds client error")) + // Call loseContact explicitly, error in EDS callback is not handled. + // Eventually, this should call EDS ballback with an error that indicates + // "lost contact". + lb.loseContact() // switch to fallback // fallback balancer A takes over @@ -703,7 +706,7 @@ func (s) TestXdsBalancerFallBackSignalFromEdsBalancer(t *testing.T) { defer lb.Close() cfg := &XDSConfig{ - BalancerName: "", + BalancerName: "wrong-balancer-name", ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, } @@ -741,8 +744,10 @@ func (s) TestXdsBalancerFallBackSignalFromEdsBalancer(t *testing.T) { // lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record // to make sure there's a new one created and get the pointer to it. lbABuilder.clearLastBalancer() - // Callback with an error, the balancer should switch to fallback. - testXDSClient.callEDSCallback(nil, fmt.Errorf("xds client error")) + // Call loseContact explicitly, error in EDS callback is not handled. + // Eventually, this should call EDS ballback with an error that indicates + // "lost contact". + lb.loseContact() // switch to fallback // fallback balancer A takes over