From e786c2dfb285929db56044f9e376dcdfcd2a9dac Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 27 May 2020 09:55:12 -0700 Subject: [PATCH] xds: eds balancer handleEDSUpdate cleanup (#3639) --- xds/internal/balancer/edsbalancer/eds.go | 60 ++++++------------- .../balancer/edsbalancer/eds_impl_test.go | 2 +- xds/internal/balancer/edsbalancer/eds_test.go | 4 +- .../edsbalancer/xds_client_wrapper.go | 16 +---- .../edsbalancer/xds_client_wrapper_test.go | 23 ++++--- 5 files changed, 40 insertions(+), 65 deletions(-) diff --git a/xds/internal/balancer/edsbalancer/eds.go b/xds/internal/balancer/edsbalancer/eds.go index 02ac314c..1cf9b16e 100644 --- a/xds/internal/balancer/edsbalancer/eds.go +++ b/xds/internal/balancer/edsbalancer/eds.go @@ -23,7 +23,6 @@ import ( "context" "encoding/json" "fmt" - "time" "github.com/google/go-cmp/cmp" "google.golang.org/grpc/balancer" @@ -37,8 +36,7 @@ import ( ) const ( - defaultTimeout = 10 * time.Second - edsName = "eds_experimental" + edsName = "eds_experimental" ) var ( @@ -62,13 +60,13 @@ func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp cc: cc, buildOpts: opts, grpcUpdate: make(chan interface{}), - xdsClientUpdate: make(chan interface{}), + xdsClientUpdate: make(chan *edsUpdate), childPolicyUpdate: buffer.NewUnbounded(), } loadStore := lrs.NewStore() x.logger = grpclog.NewPrefixLogger(loggingPrefix(x)) x.edsImpl = newEDSBalancer(x.cc, x.enqueueChildBalancerState, loadStore, x.logger) - x.client = newXDSClientWrapper(x.handleEDSUpdate, x.loseContact, x.buildOpts, loadStore, x.logger) + x.client = newXDSClientWrapper(x.handleEDSUpdate, x.buildOpts, loadStore, x.logger) x.logger.Infof("Created") go x.run() return x @@ -117,7 +115,7 @@ type edsBalancer struct { // edsBalancer continuously monitor the channels below, and will handle events from them in sync. grpcUpdate chan interface{} - xdsClientUpdate chan interface{} + xdsClientUpdate chan *edsUpdate childPolicyUpdate *buffer.Unbounded client *xdsclientWrapper // may change when passed a different service config @@ -140,12 +138,8 @@ func (x *edsBalancer) run() { u := update.(*balancerStateWithPriority) x.edsImpl.updateState(u.priority, u.s) case <-x.ctx.Done(): - if x.client != nil { - x.client.close() - } - if x.edsImpl != nil { - x.edsImpl.close() - } + x.client.close() + x.edsImpl.close() return } } @@ -154,9 +148,7 @@ func (x *edsBalancer) run() { func (x *edsBalancer) handleGRPCUpdate(update interface{}) { switch u := update.(type) { case *subConnStateUpdate: - if x.edsImpl != nil { - x.edsImpl.handleSubConnStateChange(u.sc, u.state.ConnectivityState) - } + x.edsImpl.handleSubConnStateChange(u.sc, u.state.ConnectivityState) case *balancer.ClientConnState: x.logger.Infof("Receive update from resolver, balancer config: %+v", u.BalancerConfig) cfg, _ := u.BalancerConfig.(*EDSConfig) @@ -174,7 +166,7 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) { // We will update the edsImpl with the new child policy, if we got a // different one. - if x.edsImpl != nil && !cmp.Equal(cfg.ChildPolicy, x.config.ChildPolicy) { + if !cmp.Equal(cfg.ChildPolicy, x.config.ChildPolicy) { if cfg.ChildPolicy != nil { x.edsImpl.handleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config) } else { @@ -189,17 +181,13 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) { } } -func (x *edsBalancer) handleXDSClientUpdate(update interface{}) { - switch u := update.(type) { - // TODO: this func should accept (xdsclient.EndpointsUpdate, error), and process - // the error, instead of having a separate loseContact signal. - case xdsclient.EndpointsUpdate: - x.edsImpl.handleEDSResponse(u) - case *loseContact: - // loseContact can be useful for going into fallback. - default: - panic("unexpected xds client update type") +func (x *edsBalancer) handleXDSClientUpdate(update *edsUpdate) { + if update.err != nil { + // TODO: handle errors from EDS callback. E.g. if CDS resource is + // removed, the EDS watch should be canceled. + return } + x.edsImpl.handleEDSResponse(update.resp) } type subConnStateUpdate struct { @@ -233,24 +221,14 @@ func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error { return nil } -func (x *edsBalancer) handleEDSUpdate(resp xdsclient.EndpointsUpdate) error { - // TODO: this function should take (resp, error), and send them together on - // the channel. There doesn't need to be a separate `loseContact` function. - select { - case x.xdsClientUpdate <- resp: - case <-x.ctx.Done(): - } - - return nil +type edsUpdate struct { + resp xdsclient.EndpointsUpdate + err error } -type loseContact struct { -} - -// TODO: delete loseContact when handleEDSUpdate takes (resp, error). -func (x *edsBalancer) loseContact() { +func (x *edsBalancer) handleEDSUpdate(resp xdsclient.EndpointsUpdate, err error) { select { - case x.xdsClientUpdate <- &loseContact{}: + case x.xdsClientUpdate <- &edsUpdate{resp: resp, err: err}: case <-x.ctx.Done(): } } diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index 1e449430..d308f97c 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -412,7 +412,7 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) { for i := 0; i < 5; i++ { _, err := p0.Pick(balancer.PickInfo{}) if err != testutils.ErrTestConstPicker { - t.Fatalf("picker.Pick, got err %q, want err %q", err, testutils.ErrTestConstPicker) + t.Fatalf("picker.Pick, got err %+v, want err %+v", err, testutils.ErrTestConstPicker) } } diff --git a/xds/internal/balancer/edsbalancer/eds_test.go b/xds/internal/balancer/edsbalancer/eds_test.go index 5f8b04d6..ed1e44de 100644 --- a/xds/internal/balancer/edsbalancer/eds_test.go +++ b/xds/internal/balancer/edsbalancer/eds_test.go @@ -108,10 +108,12 @@ func (f *fakeEDSBalancer) handleChildPolicy(name string, config json.RawMessage) f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config}) } -func (f *fakeEDSBalancer) close() {} func (f *fakeEDSBalancer) handleEDSResponse(edsResp xdsclient.EndpointsUpdate) {} + func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {} +func (f *fakeEDSBalancer) close() {} + func (f *fakeEDSBalancer) waitForChildPolicy(wantPolicy *loadBalancingConfig) error { val, err := f.childPolicy.Receive() if err != nil { diff --git a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go index ab06c1a2..15fdd7d4 100644 --- a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go +++ b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go @@ -50,8 +50,7 @@ var ( type xdsclientWrapper struct { logger *grpclog.PrefixLogger - newEDSUpdate func(xdsclient.EndpointsUpdate) error - loseContact func() + newEDSUpdate func(xdsclient.EndpointsUpdate, error) bbo balancer.BuildOptions loadStore lrs.Store @@ -78,11 +77,10 @@ type xdsclientWrapper struct { // // The given callbacks won't be called until the underlying xds_client is // working and sends updates. -func newXDSClientWrapper(newEDSUpdate func(xdsclient.EndpointsUpdate) error, loseContact func(), bbo balancer.BuildOptions, loadStore lrs.Store, logger *grpclog.PrefixLogger) *xdsclientWrapper { +func newXDSClientWrapper(newEDSUpdate func(xdsclient.EndpointsUpdate, error), bbo balancer.BuildOptions, loadStore lrs.Store, logger *grpclog.PrefixLogger) *xdsclientWrapper { return &xdsclientWrapper{ logger: logger, newEDSUpdate: newEDSUpdate, - loseContact: loseContact, bbo: bbo, loadStore: loadStore, } @@ -188,16 +186,8 @@ func (c *xdsclientWrapper) startEndpointsWatch(nameToWatch string) { c.cancelEndpointsWatch() } cancelEDSWatch := c.xdsclient.WatchEndpoints(c.edsServiceName, func(update xdsclient.EndpointsUpdate, err error) { - if err != nil { - // TODO: this should trigger a call to `c.loseContact`, when the - // error indicates "lose contact". - c.logger.Warningf("Watch error from xds-client %p: %v", c.xdsclient, err) - return - } c.logger.Infof("Watch update from xds-client %p, content: %+v", c.xdsclient, update) - if err := c.newEDSUpdate(update); err != nil { - c.logger.Warningf("xds: processing new EDS update failed due to %v.", err) - } + c.newEDSUpdate(update, err) }) c.logger.Infof("Watch started on resource name %v with xds-client %p", c.edsServiceName, c.xdsclient) c.cancelEndpointsWatch = func() { diff --git a/xds/internal/balancer/edsbalancer/xds_client_wrapper_test.go b/xds/internal/balancer/edsbalancer/xds_client_wrapper_test.go index 15124a8c..f7cc4bda 100644 --- a/xds/internal/balancer/edsbalancer/xds_client_wrapper_test.go +++ b/xds/internal/balancer/edsbalancer/xds_client_wrapper_test.go @@ -64,7 +64,7 @@ func (s) TestClientWrapperWatchEDS(t *testing.T) { } defer cleanup() - cw := newXDSClientWrapper(nil, nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, nil) + cw := newXDSClientWrapper(nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, nil) defer cw.close() for _, test := range []struct { @@ -159,12 +159,11 @@ func (s) TestClientWrapperWatchEDS(t *testing.T) { // edsBalancer with the received error. func (s) TestClientWrapperHandleUpdateError(t *testing.T) { edsRespChan := testutils.NewChannel() - newEDS := func(update xdsclient.EndpointsUpdate) error { - edsRespChan.Send(update) - return nil + newEDS := func(update xdsclient.EndpointsUpdate, err error) { + edsRespChan.Send(&edsUpdate{resp: update, err: err}) } - cw := newXDSClientWrapper(newEDS, nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, nil) + cw := newXDSClientWrapper(newEDS, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, nil) defer cw.close() xdsC := fakeclient.NewClient() @@ -176,14 +175,20 @@ func (s) TestClientWrapperHandleUpdateError(t *testing.T) { if gotCluster != testEDSClusterName { t.Fatalf("xdsClient.WatchEndpoints() called with cluster: %v, want %v", gotCluster, testEDSClusterName) } - xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, errors.New("EDS watch callback error")) + watchErr := errors.New("EDS watch callback error") + xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, watchErr) // The callback is called with an error, expect no update from edsRespChan. // // TODO: check for loseContact() when errors indicating "lose contact" are // handled correctly. - if gotUpdate, gotErr := edsRespChan.Receive(); gotErr != testutils.ErrRecvTimeout { - t.Fatalf("edsBalancer got edsUpdate {%+v, %v}, when none was expected", gotUpdate, gotErr) + gotUpdate, err := edsRespChan.Receive() + if err != nil { + t.Fatalf("edsBalancer failed to get edsUpdate %v", err) + } + update := gotUpdate.(*edsUpdate) + if !cmp.Equal(update.resp, (xdsclient.EndpointsUpdate{})) || update.err != watchErr { + t.Fatalf("want update {nil, %v}, got %+v", watchErr, update) } } @@ -198,7 +203,7 @@ func (s) TestClientWrapperGetsXDSClientInAttributes(t *testing.T) { } defer func() { xdsclientNew = oldxdsclientNew }() - cw := newXDSClientWrapper(nil, nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, nil) + cw := newXDSClientWrapper(nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, nil) defer cw.close() // Verify that the eds watch is registered for the expected resource name.