diff --git a/xds/internal/balancer/edsbalancer/eds.go b/xds/internal/balancer/edsbalancer/eds.go index 1cf9b16e..59bbe5fb 100644 --- a/xds/internal/balancer/edsbalancer/eds.go +++ b/xds/internal/balancer/edsbalancer/eds.go @@ -145,6 +145,32 @@ func (x *edsBalancer) run() { } } +// handleErrorFromUpdate handles both the error from parent ClientConn (from CDS +// balancer) and the error from xds client (from the watcher). fromParent is +// true if error is from parent ClientConn. +// +// If the error is connection error, it should be handled for fallback purposes. +// +// If the error is resource-not-found: +// - If it's from CDS balancer (shows as a resolver error), it means LDS or CDS +// resources were removed. The EDS watch should be canceled. +// - If it's from xds client, it means EDS resource were removed. The EDS +// watcher should keep watching. +// In both cases, the sub-balancers will be closed, and the future picks will +// fail. +func (x *edsBalancer) handleErrorFromUpdate(err error, fromParent bool) { + if xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound { + if fromParent { + // This is an error from the parent ClientConn (can be the parent + // CDS balancer), and is a resource-not-found error. This means the + // resource (can be either LDS or CDS) was removed. Stop the EDS + // watch. + x.client.cancelWatch() + } + x.edsImpl.handleEDSResponse(xdsclient.EndpointsUpdate{}) + } +} + func (x *edsBalancer) handleGRPCUpdate(update interface{}) { switch u := update.(type) { case *subConnStateUpdate: @@ -175,6 +201,8 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) { } x.config = cfg + case error: + x.handleErrorFromUpdate(u, true) default: // unreachable path panic("wrong update type") @@ -182,9 +210,8 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) { } func (x *edsBalancer) handleXDSClientUpdate(update *edsUpdate) { - if update.err != nil { - // TODO: handle errors from EDS callback. E.g. if CDS resource is - // removed, the EDS watch should be canceled. + if err := update.err; err != nil { + x.handleErrorFromUpdate(err, false) return } x.edsImpl.handleEDSResponse(update.resp) @@ -206,11 +233,11 @@ func (x *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Sub } } -func (x *edsBalancer) ResolverError(error) { - // TODO: Need to distinguish between connection errors and resource removed - // errors. For the former, we will need to handle it later on for fallback. - // For the latter, handle it by stopping the watch, closing sub-balancers - // and pickers. +func (x *edsBalancer) ResolverError(err error) { + select { + case x.grpcUpdate <- err: + case <-x.ctx.Done(): + } } func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error { diff --git a/xds/internal/balancer/edsbalancer/eds_impl.go b/xds/internal/balancer/edsbalancer/eds_impl.go index 0aeb6cc3..979eee45 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl.go +++ b/xds/internal/balancer/edsbalancer/eds_impl.go @@ -18,11 +18,13 @@ package edsbalancer import ( "encoding/json" + "reflect" "sync" "time" "github.com/google/go-cmp/cmp" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/balancer/weightedroundrobin" "google.golang.org/grpc/codes" @@ -66,6 +68,7 @@ type edsBalancerImpl struct { subBalancerBuilder balancer.Builder loadStore lrs.Store priorityToLocalities map[priorityType]*balancerGroupWithConfig + respReceived bool // There's no need to hold any mutexes at the same time. The order to take // mutex should be: priorityMu > subConnMu, but this is implicit via @@ -183,6 +186,17 @@ func (edsImpl *edsBalancerImpl) handleEDSResponse(edsResp xdsclient.EndpointsUpd // - socketAddress.GetNamedPort(), socketAddress.GetResolverName() // - resolve endpoint's name with another resolver + // If the first EDS update is an empty update, nothing is changing from the + // previous update (which is the default empty value). We need to explicitly + // handle first update being empty, and send a transient failure picker. + // + // TODO: define Equal() on type EndpointUpdate to avoid DeepEqual. And do + // the same for the other types. + if !edsImpl.respReceived && reflect.DeepEqual(edsResp, xdsclient.EndpointsUpdate{}) { + edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(errAllPrioritiesRemoved)}) + } + edsImpl.respReceived = true + edsImpl.updateDrops(edsResp.Drops) // Filter out all localities with weight 0. diff --git a/xds/internal/balancer/edsbalancer/eds_impl_priority.go b/xds/internal/balancer/edsbalancer/eds_impl_priority.go index a279ddc6..c2ce3123 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_priority.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_priority.go @@ -18,6 +18,7 @@ package edsbalancer import ( + "errors" "fmt" "time" @@ -27,6 +28,8 @@ import ( "google.golang.org/grpc/grpclog" ) +var errAllPrioritiesRemoved = errors.New("eds: no locality is provided, all priorities are removed") + // handlePriorityChange handles priority after EDS adds/removes a // priority. // @@ -46,7 +49,7 @@ func (edsImpl *edsBalancerImpl) handlePriorityChange() { // Everything was removed by EDS. if !edsImpl.priorityLowest.isSet() { edsImpl.priorityInUse = newPriorityTypeUnset() - edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(balancer.ErrTransientFailure)}) + edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(errAllPrioritiesRemoved)}) return } diff --git a/xds/internal/balancer/edsbalancer/eds_impl_priority_test.go b/xds/internal/balancer/edsbalancer/eds_impl_priority_test.go index c98f7205..d9b9ff4a 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_priority_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_priority_test.go @@ -553,8 +553,8 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) { // Test pick return TransientFailure. pFail := <-cc.NewPickerCh for i := 0; i < 5; i++ { - if _, err := pFail.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure { - t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err) + if _, err := pFail.Pick(balancer.PickInfo{}); err != errAllPrioritiesRemoved { + t.Fatalf("want pick error %v, got %v", errAllPrioritiesRemoved, err) } } diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index d308f97c..3f47b720 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -18,6 +18,7 @@ package edsbalancer import ( "fmt" + "reflect" "sort" "testing" "time" @@ -386,6 +387,70 @@ func (s) TestClose(t *testing.T) { edsb.close() } +// TestEDS_EmptyUpdate covers the cases when eds impl receives an empty update. +// +// It should send an error picker with transient failure to the parent. +func (s) TestEDS_EmptyUpdate(t *testing.T) { + cc := testutils.NewTestClientConn(t) + edsb := newEDSBalancerImpl(cc, nil, nil, nil) + edsb.enqueueChildBalancerStateUpdate = edsb.updateState + + // The first update is an empty update. + edsb.handleEDSResponse(xdsclient.EndpointsUpdate{}) + // Pick should fail with transient failure, and all priority removed error. + perr0 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + _, err := perr0.Pick(balancer.PickInfo{}) + if !reflect.DeepEqual(err, errAllPrioritiesRemoved) { + t.Fatalf("picker.Pick, got error %v, want error %v", err, errAllPrioritiesRemoved) + } + } + + // One locality with one backend. + clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + edsb.handleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build())) + + sc1 := <-cc.NewSubConnCh + edsb.handleSubConnStateChange(sc1, connectivity.Connecting) + edsb.handleSubConnStateChange(sc1, connectivity.Ready) + + // Pick with only the first backend. + p1 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + gotSCSt, _ := p1.Pick(balancer.PickInfo{}) + if !reflect.DeepEqual(gotSCSt.SubConn, sc1) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) + } + } + + edsb.handleEDSResponse(xdsclient.EndpointsUpdate{}) + // Pick should fail with transient failure, and all priority removed error. + perr1 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + _, err := perr1.Pick(balancer.PickInfo{}) + if !reflect.DeepEqual(err, errAllPrioritiesRemoved) { + t.Fatalf("picker.Pick, got error %v, want error %v", err, errAllPrioritiesRemoved) + } + } + + // Handle another update with priorities and localities. + edsb.handleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build())) + + sc2 := <-cc.NewSubConnCh + edsb.handleSubConnStateChange(sc2, connectivity.Connecting) + edsb.handleSubConnStateChange(sc2, connectivity.Ready) + + // Pick with only the first backend. + p2 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + gotSCSt, _ := p2.Pick(balancer.PickInfo{}) + if !reflect.DeepEqual(gotSCSt.SubConn, sc2) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2) + } + } +} + // Create XDS balancer, and update sub-balancer before handling eds responses. // Then switch between round-robin and test-const-balancer after handling first // eds response. diff --git a/xds/internal/balancer/edsbalancer/eds_test.go b/xds/internal/balancer/edsbalancer/eds_test.go index ed1e44de..02cb9263 100644 --- a/xds/internal/balancer/edsbalancer/eds_test.go +++ b/xds/internal/balancer/edsbalancer/eds_test.go @@ -22,6 +22,7 @@ import ( "bytes" "encoding/json" "fmt" + "reflect" "testing" corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" @@ -97,6 +98,7 @@ type fakeEDSBalancer struct { cc balancer.ClientConn childPolicy *testutils.Channel subconnStateChange *testutils.Channel + edsUpdate *testutils.Channel loadStore lrs.Store } @@ -108,7 +110,9 @@ func (f *fakeEDSBalancer) handleChildPolicy(name string, config json.RawMessage) f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config}) } -func (f *fakeEDSBalancer) handleEDSResponse(edsResp xdsclient.EndpointsUpdate) {} +func (f *fakeEDSBalancer) handleEDSResponse(edsResp xdsclient.EndpointsUpdate) { + f.edsUpdate.Send(edsResp) +} func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {} @@ -138,11 +142,24 @@ func (f *fakeEDSBalancer) waitForSubConnStateChange(wantState *scStateChange) er return nil } +func (f *fakeEDSBalancer) waitForEDSResponse(wantUpdate xdsclient.EndpointsUpdate) error { + val, err := f.edsUpdate.Receive() + if err != nil { + return fmt.Errorf("error waiting for edsUpdate: %v", err) + } + gotUpdate := val.(xdsclient.EndpointsUpdate) + if !reflect.DeepEqual(gotUpdate, wantUpdate) { + return fmt.Errorf("got edsUpdate %+v, want %+v", gotUpdate, wantUpdate) + } + return nil +} + func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerImplInterface { return &fakeEDSBalancer{ cc: cc, childPolicy: testutils.NewChannelWithSize(10), subconnStateChange: testutils.NewChannelWithSize(10), + edsUpdate: testutils.NewChannelWithSize(10), loadStore: loadStore, } } @@ -399,6 +416,118 @@ func (s) TestXDSSubConnStateChange(t *testing.T) { edsLB.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state}) } +// TestErrorFromXDSClientUpdate verifies that errros from xdsclient update are +// handled correctly. +// +// If it's resource-not-found, watch will NOT be canceled, the EDS impl will +// receive an empty EDS update, and new RPCs will fail. +// +// If it's connection error, nothing will happen. This will need to change to +// handle fallback. +func (s) TestErrorFromXDSClientUpdate(t *testing.T) { + edsLBCh := testutils.NewChannel() + xdsClientCh := testutils.NewChannel() + cancel := setup(edsLBCh, xdsClientCh) + defer cancel() + + builder := balancer.Get(edsName) + cc := newNoopTestClientConn() + edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) + if !ok { + t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) + } + defer edsB.Close() + + edsB.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &EDSConfig{ + BalancerName: testBalancerNameFooBar, + EDSServiceName: testEDSClusterName, + }, + }) + + xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar) + xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil) + edsLB := waitForNewEDSLB(t, edsLBCh) + if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil { + t.Fatalf("EDS impl got unexpected EDS response: %v", err) + } + + connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error") + xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, connectionErr) + if err := xdsC.WaitForCancelEDSWatch(); err == nil { + t.Fatal("watch was canceled, want not canceled (timeout error)") + } + if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err == nil { + t.Fatal("eds impl got EDS resp, want timeout error") + } + + resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "edsBalancer resource not found error") + xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, resourceErr) + // Even if error is resource not found, watch shouldn't be canceled, because + // this is an EDS resource removed (and xds client actually never sends this + // error, but we still handles it). + if err := xdsC.WaitForCancelEDSWatch(); err == nil { + t.Fatal("watch was canceled, want not canceled (timeout error)") + } + if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil { + t.Fatalf("eds impl expecting empty update, got %v", err) + } +} + +// TestErrorFromResolver verifies that resolver errors are handled correctly. +// +// If it's resource-not-found, watch will be canceled, the EDS impl will receive +// an empty EDS update, and new RPCs will fail. +// +// If it's connection error, nothing will happen. This will need to change to +// handle fallback. +func (s) TestErrorFromResolver(t *testing.T) { + edsLBCh := testutils.NewChannel() + xdsClientCh := testutils.NewChannel() + cancel := setup(edsLBCh, xdsClientCh) + defer cancel() + + builder := balancer.Get(edsName) + cc := newNoopTestClientConn() + edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer) + if !ok { + t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB) + } + defer edsB.Close() + + edsB.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &EDSConfig{ + BalancerName: testBalancerNameFooBar, + EDSServiceName: testEDSClusterName, + }, + }) + + xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar) + xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil) + edsLB := waitForNewEDSLB(t, edsLBCh) + if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil { + t.Fatalf("EDS impl got unexpected EDS response: %v", err) + } + + connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error") + edsB.ResolverError(connectionErr) + if err := xdsC.WaitForCancelEDSWatch(); err == nil { + t.Fatal("watch was canceled, want not canceled (timeout error)") + } + if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err == nil { + t.Fatal("eds impl got EDS resp, want timeout error") + } + + resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "edsBalancer resource not found error") + edsB.ResolverError(resourceErr) + if err := xdsC.WaitForCancelEDSWatch(); err != nil { + t.Fatalf("want watch to be canceled, waitForCancel failed: %v", err) + } + if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil { + t.Fatalf("EDS impl got unexpected EDS response: %v", err) + } +} + func (s) TestXDSBalancerConfigParsing(t *testing.T) { const testEDSName = "eds.service" var testLRSName = "lrs.server" diff --git a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go index 15fdd7d4..0f987a94 100644 --- a/xds/internal/balancer/edsbalancer/xds_client_wrapper.go +++ b/xds/internal/balancer/edsbalancer/xds_client_wrapper.go @@ -258,20 +258,25 @@ func (c *xdsclientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attr } } -func (c *xdsclientWrapper) close() { - if c.xdsclient != nil && c.balancerName != "" { - // Only close xdsclient if it's not from attributes. - c.xdsclient.Close() - } - +func (c *xdsclientWrapper) cancelWatch() { + c.loadReportServer = nil if c.cancelLoadReport != nil { c.cancelLoadReport() } + c.edsServiceName = "" if c.cancelEndpointsWatch != nil { c.cancelEndpointsWatch() } } +func (c *xdsclientWrapper) close() { + c.cancelWatch() + if c.xdsclient != nil && c.balancerName != "" { + // Only close xdsclient if it's not from attributes. + c.xdsclient.Close() + } +} + // defaultDialCreds builds a DialOption containing the credentials to be used // while talking to the xDS server (this is done only if the xds bootstrap // process does not return any credentials to use). If the parent channel diff --git a/xds/internal/client/errors.go b/xds/internal/client/errors.go new file mode 100644 index 00000000..34ae2738 --- /dev/null +++ b/xds/internal/client/errors.go @@ -0,0 +1,60 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package client + +import "fmt" + +// ErrorType is the type of the error that the watcher will receive from the xds +// client. +type ErrorType int + +const ( + // ErrorTypeUnknown indicates the error doesn't have a specific type. It is + // the default value, and is returned if the error is not an xds error. + ErrorTypeUnknown ErrorType = iota + // ErrorTypeConnection indicates a connection error from the gRPC client. + ErrorTypeConnection + // ErrorTypeResourceNotFound indicates a resource is not found from the xds + // response. It's typically returned if the resource is removed in the xds + // server. + ErrorTypeResourceNotFound +) + +type xdsClientError struct { + t ErrorType + desc string +} + +func (e *xdsClientError) Error() string { + return e.desc +} + +// NewErrorf creates an xds client error. The callbacks are called with this +// error, to pass additional information about the error. +func NewErrorf(t ErrorType, format string, args ...interface{}) error { + return &xdsClientError{t: t, desc: fmt.Sprintf(format, args...)} +} + +// ErrType returns the error's type. +func ErrType(e error) ErrorType { + if xe, ok := e.(*xdsClientError); ok { + return xe.t + } + return ErrorTypeUnknown +} diff --git a/xds/internal/testutils/fakeclient/client.go b/xds/internal/testutils/fakeclient/client.go index 922fab1e..aacade2f 100644 --- a/xds/internal/testutils/fakeclient/client.go +++ b/xds/internal/testutils/fakeclient/client.go @@ -143,6 +143,13 @@ func (xdsC *Client) InvokeWatchEDSCallback(update xdsclient.EndpointsUpdate, err xdsC.edsCb(update, err) } +// WaitForCancelEDSWatch waits for a EDS watch to be cancelled within a +// reasonable timeout, and returns testutils.ErrRecvTimeout otherwise. +func (xdsC *Client) WaitForCancelEDSWatch() error { + _, err := xdsC.edsCancelCh.Receive() + return err +} + // ReportLoadArgs wraps the arguments passed to ReportLoad. type ReportLoadArgs struct { // Server is the name of the server to which the load is reported.