xds: handle errors in eds balancer (#3645)

This commit is contained in:
Menghan Li
2020-05-28 09:54:18 -07:00
committed by GitHub
parent 6c9e30c09d
commit acd5f84904
9 changed files with 328 additions and 18 deletions

View File

@ -145,6 +145,32 @@ func (x *edsBalancer) run() {
}
}
// handleErrorFromUpdate handles both the error from parent ClientConn (from CDS
// balancer) and the error from xds client (from the watcher). fromParent is
// true if error is from parent ClientConn.
//
// If the error is connection error, it should be handled for fallback purposes.
//
// If the error is resource-not-found:
// - If it's from CDS balancer (shows as a resolver error), it means LDS or CDS
// resources were removed. The EDS watch should be canceled.
// - If it's from xds client, it means EDS resource were removed. The EDS
// watcher should keep watching.
// In both cases, the sub-balancers will be closed, and the future picks will
// fail.
func (x *edsBalancer) handleErrorFromUpdate(err error, fromParent bool) {
if xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound {
if fromParent {
// This is an error from the parent ClientConn (can be the parent
// CDS balancer), and is a resource-not-found error. This means the
// resource (can be either LDS or CDS) was removed. Stop the EDS
// watch.
x.client.cancelWatch()
}
x.edsImpl.handleEDSResponse(xdsclient.EndpointsUpdate{})
}
}
func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
switch u := update.(type) {
case *subConnStateUpdate:
@ -175,6 +201,8 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
}
x.config = cfg
case error:
x.handleErrorFromUpdate(u, true)
default:
// unreachable path
panic("wrong update type")
@ -182,9 +210,8 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
}
func (x *edsBalancer) handleXDSClientUpdate(update *edsUpdate) {
if update.err != nil {
// TODO: handle errors from EDS callback. E.g. if CDS resource is
// removed, the EDS watch should be canceled.
if err := update.err; err != nil {
x.handleErrorFromUpdate(err, false)
return
}
x.edsImpl.handleEDSResponse(update.resp)
@ -206,11 +233,11 @@ func (x *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Sub
}
}
func (x *edsBalancer) ResolverError(error) {
// TODO: Need to distinguish between connection errors and resource removed
// errors. For the former, we will need to handle it later on for fallback.
// For the latter, handle it by stopping the watch, closing sub-balancers
// and pickers.
func (x *edsBalancer) ResolverError(err error) {
select {
case x.grpcUpdate <- err:
case <-x.ctx.Done():
}
}
func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {

View File

@ -18,11 +18,13 @@ package edsbalancer
import (
"encoding/json"
"reflect"
"sync"
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/codes"
@ -66,6 +68,7 @@ type edsBalancerImpl struct {
subBalancerBuilder balancer.Builder
loadStore lrs.Store
priorityToLocalities map[priorityType]*balancerGroupWithConfig
respReceived bool
// There's no need to hold any mutexes at the same time. The order to take
// mutex should be: priorityMu > subConnMu, but this is implicit via
@ -183,6 +186,17 @@ func (edsImpl *edsBalancerImpl) handleEDSResponse(edsResp xdsclient.EndpointsUpd
// - socketAddress.GetNamedPort(), socketAddress.GetResolverName()
// - resolve endpoint's name with another resolver
// If the first EDS update is an empty update, nothing is changing from the
// previous update (which is the default empty value). We need to explicitly
// handle first update being empty, and send a transient failure picker.
//
// TODO: define Equal() on type EndpointUpdate to avoid DeepEqual. And do
// the same for the other types.
if !edsImpl.respReceived && reflect.DeepEqual(edsResp, xdsclient.EndpointsUpdate{}) {
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(errAllPrioritiesRemoved)})
}
edsImpl.respReceived = true
edsImpl.updateDrops(edsResp.Drops)
// Filter out all localities with weight 0.

View File

@ -18,6 +18,7 @@
package edsbalancer
import (
"errors"
"fmt"
"time"
@ -27,6 +28,8 @@ import (
"google.golang.org/grpc/grpclog"
)
var errAllPrioritiesRemoved = errors.New("eds: no locality is provided, all priorities are removed")
// handlePriorityChange handles priority after EDS adds/removes a
// priority.
//
@ -46,7 +49,7 @@ func (edsImpl *edsBalancerImpl) handlePriorityChange() {
// Everything was removed by EDS.
if !edsImpl.priorityLowest.isSet() {
edsImpl.priorityInUse = newPriorityTypeUnset()
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(balancer.ErrTransientFailure)})
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(errAllPrioritiesRemoved)})
return
}

View File

@ -553,8 +553,8 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) {
// Test pick return TransientFailure.
pFail := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := pFail.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure {
t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err)
if _, err := pFail.Pick(balancer.PickInfo{}); err != errAllPrioritiesRemoved {
t.Fatalf("want pick error %v, got %v", errAllPrioritiesRemoved, err)
}
}

View File

@ -18,6 +18,7 @@ package edsbalancer
import (
"fmt"
"reflect"
"sort"
"testing"
"time"
@ -386,6 +387,70 @@ func (s) TestClose(t *testing.T) {
edsb.close()
}
// TestEDS_EmptyUpdate covers the cases when eds impl receives an empty update.
//
// It should send an error picker with transient failure to the parent.
func (s) TestEDS_EmptyUpdate(t *testing.T) {
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
// The first update is an empty update.
edsb.handleEDSResponse(xdsclient.EndpointsUpdate{})
// Pick should fail with transient failure, and all priority removed error.
perr0 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
_, err := perr0.Pick(balancer.PickInfo{})
if !reflect.DeepEqual(err, errAllPrioritiesRemoved) {
t.Fatalf("picker.Pick, got error %v, want error %v", err, errAllPrioritiesRemoved)
}
}
// One locality with one backend.
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.handleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
sc1 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc1, connectivity.Connecting)
edsb.handleSubConnStateChange(sc1, connectivity.Ready)
// Pick with only the first backend.
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !reflect.DeepEqual(gotSCSt.SubConn, sc1) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
}
edsb.handleEDSResponse(xdsclient.EndpointsUpdate{})
// Pick should fail with transient failure, and all priority removed error.
perr1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
_, err := perr1.Pick(balancer.PickInfo{})
if !reflect.DeepEqual(err, errAllPrioritiesRemoved) {
t.Fatalf("picker.Pick, got error %v, want error %v", err, errAllPrioritiesRemoved)
}
}
// Handle another update with priorities and localities.
edsb.handleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
sc2 := <-cc.NewSubConnCh
edsb.handleSubConnStateChange(sc2, connectivity.Connecting)
edsb.handleSubConnStateChange(sc2, connectivity.Ready)
// Pick with only the first backend.
p2 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p2.Pick(balancer.PickInfo{})
if !reflect.DeepEqual(gotSCSt.SubConn, sc2) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2)
}
}
}
// Create XDS balancer, and update sub-balancer before handling eds responses.
// Then switch between round-robin and test-const-balancer after handling first
// eds response.

View File

@ -22,6 +22,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"testing"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
@ -97,6 +98,7 @@ type fakeEDSBalancer struct {
cc balancer.ClientConn
childPolicy *testutils.Channel
subconnStateChange *testutils.Channel
edsUpdate *testutils.Channel
loadStore lrs.Store
}
@ -108,7 +110,9 @@ func (f *fakeEDSBalancer) handleChildPolicy(name string, config json.RawMessage)
f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config})
}
func (f *fakeEDSBalancer) handleEDSResponse(edsResp xdsclient.EndpointsUpdate) {}
func (f *fakeEDSBalancer) handleEDSResponse(edsResp xdsclient.EndpointsUpdate) {
f.edsUpdate.Send(edsResp)
}
func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {}
@ -138,11 +142,24 @@ func (f *fakeEDSBalancer) waitForSubConnStateChange(wantState *scStateChange) er
return nil
}
func (f *fakeEDSBalancer) waitForEDSResponse(wantUpdate xdsclient.EndpointsUpdate) error {
val, err := f.edsUpdate.Receive()
if err != nil {
return fmt.Errorf("error waiting for edsUpdate: %v", err)
}
gotUpdate := val.(xdsclient.EndpointsUpdate)
if !reflect.DeepEqual(gotUpdate, wantUpdate) {
return fmt.Errorf("got edsUpdate %+v, want %+v", gotUpdate, wantUpdate)
}
return nil
}
func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerImplInterface {
return &fakeEDSBalancer{
cc: cc,
childPolicy: testutils.NewChannelWithSize(10),
subconnStateChange: testutils.NewChannelWithSize(10),
edsUpdate: testutils.NewChannelWithSize(10),
loadStore: loadStore,
}
}
@ -399,6 +416,118 @@ func (s) TestXDSSubConnStateChange(t *testing.T) {
edsLB.waitForSubConnStateChange(&scStateChange{sc: fsc, state: state})
}
// TestErrorFromXDSClientUpdate verifies that errros from xdsclient update are
// handled correctly.
//
// If it's resource-not-found, watch will NOT be canceled, the EDS impl will
// receive an empty EDS update, and new RPCs will fail.
//
// If it's connection error, nothing will happen. This will need to change to
// handle fallback.
func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsClientCh := testutils.NewChannel()
cancel := setup(edsLBCh, xdsClientCh)
defer cancel()
builder := balancer.Get(edsName)
cc := newNoopTestClientConn()
edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
if !ok {
t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
}
defer edsB.Close()
edsB.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{
BalancerName: testBalancerNameFooBar,
EDSServiceName: testEDSClusterName,
},
})
xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
edsLB := waitForNewEDSLB(t, edsLBCh)
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil {
t.Fatalf("EDS impl got unexpected EDS response: %v", err)
}
connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, connectionErr)
if err := xdsC.WaitForCancelEDSWatch(); err == nil {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err == nil {
t.Fatal("eds impl got EDS resp, want timeout error")
}
resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "edsBalancer resource not found error")
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, resourceErr)
// Even if error is resource not found, watch shouldn't be canceled, because
// this is an EDS resource removed (and xds client actually never sends this
// error, but we still handles it).
if err := xdsC.WaitForCancelEDSWatch(); err == nil {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil {
t.Fatalf("eds impl expecting empty update, got %v", err)
}
}
// TestErrorFromResolver verifies that resolver errors are handled correctly.
//
// If it's resource-not-found, watch will be canceled, the EDS impl will receive
// an empty EDS update, and new RPCs will fail.
//
// If it's connection error, nothing will happen. This will need to change to
// handle fallback.
func (s) TestErrorFromResolver(t *testing.T) {
edsLBCh := testutils.NewChannel()
xdsClientCh := testutils.NewChannel()
cancel := setup(edsLBCh, xdsClientCh)
defer cancel()
builder := balancer.Get(edsName)
cc := newNoopTestClientConn()
edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
if !ok {
t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
}
defer edsB.Close()
edsB.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{
BalancerName: testBalancerNameFooBar,
EDSServiceName: testEDSClusterName,
},
})
xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
edsLB := waitForNewEDSLB(t, edsLBCh)
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil {
t.Fatalf("EDS impl got unexpected EDS response: %v", err)
}
connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
edsB.ResolverError(connectionErr)
if err := xdsC.WaitForCancelEDSWatch(); err == nil {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err == nil {
t.Fatal("eds impl got EDS resp, want timeout error")
}
resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "edsBalancer resource not found error")
edsB.ResolverError(resourceErr)
if err := xdsC.WaitForCancelEDSWatch(); err != nil {
t.Fatalf("want watch to be canceled, waitForCancel failed: %v", err)
}
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil {
t.Fatalf("EDS impl got unexpected EDS response: %v", err)
}
}
func (s) TestXDSBalancerConfigParsing(t *testing.T) {
const testEDSName = "eds.service"
var testLRSName = "lrs.server"

View File

@ -258,20 +258,25 @@ func (c *xdsclientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attr
}
}
func (c *xdsclientWrapper) close() {
if c.xdsclient != nil && c.balancerName != "" {
// Only close xdsclient if it's not from attributes.
c.xdsclient.Close()
}
func (c *xdsclientWrapper) cancelWatch() {
c.loadReportServer = nil
if c.cancelLoadReport != nil {
c.cancelLoadReport()
}
c.edsServiceName = ""
if c.cancelEndpointsWatch != nil {
c.cancelEndpointsWatch()
}
}
func (c *xdsclientWrapper) close() {
c.cancelWatch()
if c.xdsclient != nil && c.balancerName != "" {
// Only close xdsclient if it's not from attributes.
c.xdsclient.Close()
}
}
// defaultDialCreds builds a DialOption containing the credentials to be used
// while talking to the xDS server (this is done only if the xds bootstrap
// process does not return any credentials to use). If the parent channel

View File

@ -0,0 +1,60 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package client
import "fmt"
// ErrorType is the type of the error that the watcher will receive from the xds
// client.
type ErrorType int
const (
// ErrorTypeUnknown indicates the error doesn't have a specific type. It is
// the default value, and is returned if the error is not an xds error.
ErrorTypeUnknown ErrorType = iota
// ErrorTypeConnection indicates a connection error from the gRPC client.
ErrorTypeConnection
// ErrorTypeResourceNotFound indicates a resource is not found from the xds
// response. It's typically returned if the resource is removed in the xds
// server.
ErrorTypeResourceNotFound
)
type xdsClientError struct {
t ErrorType
desc string
}
func (e *xdsClientError) Error() string {
return e.desc
}
// NewErrorf creates an xds client error. The callbacks are called with this
// error, to pass additional information about the error.
func NewErrorf(t ErrorType, format string, args ...interface{}) error {
return &xdsClientError{t: t, desc: fmt.Sprintf(format, args...)}
}
// ErrType returns the error's type.
func ErrType(e error) ErrorType {
if xe, ok := e.(*xdsClientError); ok {
return xe.t
}
return ErrorTypeUnknown
}

View File

@ -143,6 +143,13 @@ func (xdsC *Client) InvokeWatchEDSCallback(update xdsclient.EndpointsUpdate, err
xdsC.edsCb(update, err)
}
// WaitForCancelEDSWatch waits for a EDS watch to be cancelled within a
// reasonable timeout, and returns testutils.ErrRecvTimeout otherwise.
func (xdsC *Client) WaitForCancelEDSWatch() error {
_, err := xdsC.edsCancelCh.Receive()
return err
}
// ReportLoadArgs wraps the arguments passed to ReportLoad.
type ReportLoadArgs struct {
// Server is the name of the server to which the load is reported.