xds: skip callback if xds client reports error (#3240)
Errors will be handled specifically, depending on whether it's a connection error or other types of errors. Without this fix, balancer's callback will be called with <nil> update, causing nil panic later.
This commit is contained in:
@ -19,7 +19,9 @@
|
|||||||
package balancer
|
package balancer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||||||
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
||||||
@ -34,9 +36,11 @@ import (
|
|||||||
"google.golang.org/grpc/balancer"
|
"google.golang.org/grpc/balancer"
|
||||||
"google.golang.org/grpc/resolver"
|
"google.golang.org/grpc/resolver"
|
||||||
xdsinternal "google.golang.org/grpc/xds/internal"
|
xdsinternal "google.golang.org/grpc/xds/internal"
|
||||||
|
"google.golang.org/grpc/xds/internal/balancer/lrs"
|
||||||
xdsclient "google.golang.org/grpc/xds/internal/client"
|
xdsclient "google.golang.org/grpc/xds/internal/client"
|
||||||
"google.golang.org/grpc/xds/internal/client/bootstrap"
|
"google.golang.org/grpc/xds/internal/client/bootstrap"
|
||||||
"google.golang.org/grpc/xds/internal/client/fakexds"
|
"google.golang.org/grpc/xds/internal/client/fakexds"
|
||||||
|
"google.golang.org/grpc/xds/internal/testutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -164,6 +168,72 @@ func (s) TestEDSClientResponseHandling(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type testXDSClient struct {
|
||||||
|
clusterNameChan *testutils.Channel
|
||||||
|
edsCb func(*xdsclient.EDSUpdate, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestXDSClient() *testXDSClient {
|
||||||
|
return &testXDSClient{
|
||||||
|
clusterNameChan: testutils.NewChannel(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *testXDSClient) WatchEDS(clusterName string, edsCb func(*xdsclient.EDSUpdate, error)) (cancel func()) {
|
||||||
|
c.clusterNameChan.Send(clusterName)
|
||||||
|
c.edsCb = edsCb
|
||||||
|
return func() {}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *testXDSClient) ReportLoad(server string, clusterName string, loadStore lrs.Store) (cancel func()) {
|
||||||
|
panic("implement me 2")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *testXDSClient) Close() {
|
||||||
|
panic("implement me 3")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that error from the xds client is handled correctly.
|
||||||
|
func (s) TestEDSClientResponseErrorHandling(t *testing.T) {
|
||||||
|
td, cleanup := fakexds.StartServer(t)
|
||||||
|
defer cleanup()
|
||||||
|
edsRespChan := make(chan *xdsclient.EDSUpdate, 10)
|
||||||
|
newEDS := func(i *xdsclient.EDSUpdate) error {
|
||||||
|
edsRespChan <- i
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
client := newXDSClientWrapper(newEDS, func() {}, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
|
||||||
|
defer client.close()
|
||||||
|
|
||||||
|
// Create a client to be passed in attributes.
|
||||||
|
c := newTestXDSClient()
|
||||||
|
client.handleUpdate(&XDSConfig{
|
||||||
|
BalancerName: td.Address,
|
||||||
|
EDSServiceName: testEDSClusterName,
|
||||||
|
LrsLoadReportingServerName: nil,
|
||||||
|
}, attributes.New(xdsinternal.XDSClientID, c),
|
||||||
|
)
|
||||||
|
|
||||||
|
if gotClusterName, err := c.clusterNameChan.Receive(); err != nil || gotClusterName != testEDSClusterName {
|
||||||
|
t.Fatalf("got EDS watch clusterName %v, expected: %v", gotClusterName, testEDSClusterName)
|
||||||
|
}
|
||||||
|
c.edsCb(nil, fmt.Errorf("testing err"))
|
||||||
|
|
||||||
|
// The ballback is called with an error, expect no update from edsRespChan.
|
||||||
|
//
|
||||||
|
// TODO: check for loseContact() when errors indicating "lose contact" are
|
||||||
|
// handled correctly.
|
||||||
|
|
||||||
|
timer := time.NewTimer(time.Second)
|
||||||
|
defer timer.Stop()
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
case resp := <-edsRespChan:
|
||||||
|
t.Fatalf("unexpected resp: %v", resp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Test that if xds_client is in attributes, the xdsclientnew function will not
|
// Test that if xds_client is in attributes, the xdsclientnew function will not
|
||||||
// be called, and the xds_client from attributes will be used.
|
// be called, and the xds_client from attributes will be used.
|
||||||
//
|
//
|
||||||
|
@ -182,8 +182,11 @@ func (c *xdsclientWrapper) startEDSWatch(nameToWatch string) {
|
|||||||
|
|
||||||
c.edsServiceName = nameToWatch
|
c.edsServiceName = nameToWatch
|
||||||
c.cancelEDSWatch = c.xdsclient.WatchEDS(c.edsServiceName, func(update *xdsclient.EDSUpdate, err error) {
|
c.cancelEDSWatch = c.xdsclient.WatchEDS(c.edsServiceName, func(update *xdsclient.EDSUpdate, err error) {
|
||||||
// TODO: this should trigger a call to `c.loseContact`, when the error
|
if err != nil {
|
||||||
// indicates "lose contact".
|
// TODO: this should trigger a call to `c.loseContact`, when the
|
||||||
|
// error indicates "lose contact".
|
||||||
|
return
|
||||||
|
}
|
||||||
if err := c.newEDSUpdate(update); err != nil {
|
if err := c.newEDSUpdate(update); err != nil {
|
||||||
grpclog.Warningf("xds: processing new EDS update failed due to %v.", err)
|
grpclog.Warningf("xds: processing new EDS update failed due to %v.", err)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user