xds: in EDS balancer, use xds_client from attibutes (#3205)
This commit is contained in:
@ -28,6 +28,7 @@ import (
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/balancer/roundrobin"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/resolver"
|
||||
@ -77,6 +78,7 @@ func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp
|
||||
updateState: x.connStateMgr.updateState,
|
||||
ClientConn: cc,
|
||||
}
|
||||
x.client = newXDSClientWrapper(x.handleEDSUpdate, x.loseContact, x.buildOpts, x.loadStore)
|
||||
go x.run()
|
||||
return x
|
||||
}
|
||||
@ -137,72 +139,6 @@ type edsBalancer struct {
|
||||
loadStore lrs.Store
|
||||
}
|
||||
|
||||
// TODO: cleanup this function, or just remove it. It was here because the xds
|
||||
// server name from service config can change, and we need to migrate from the
|
||||
// old one to the new one. Now the xds server name is specified by the bootstrap
|
||||
// file, and should never change. There's no need for this.
|
||||
func (x *edsBalancer) startNewXDSClient(u *XDSConfig) {
|
||||
// If the edsBalancer is in startup stage, then we need to apply the startup timeout for the first
|
||||
// xdsClient to get a response from the traffic director.
|
||||
if x.startup {
|
||||
x.startFallbackMonitoring()
|
||||
}
|
||||
|
||||
// Whenever service config gives a new traffic director name, we need to create an xds client to
|
||||
// connect to it. However, previous xds client should not be closed until the new one successfully
|
||||
// connects to the traffic director (i.e. get an ADS response from the traffic director). Therefore,
|
||||
// we let each new client to be responsible to close its immediate predecessor. In this way,
|
||||
// edsBalancer does not to implement complex synchronization to achieve the same purpose.
|
||||
prevClient := x.client
|
||||
// haveGotADS is true means, this xdsClient has got ADS response from director in the past, which
|
||||
// means it can close previous client if it hasn't and it now can send lose contact signal for
|
||||
// fallback monitoring.
|
||||
var haveGotADS bool
|
||||
|
||||
// set up callbacks for the xds client.
|
||||
newADS := func(ctx context.Context, resp *xdsclient.EDSUpdate) error {
|
||||
if !haveGotADS {
|
||||
if prevClient != nil {
|
||||
prevClient.close()
|
||||
}
|
||||
haveGotADS = true
|
||||
}
|
||||
return x.newADSResponse(ctx, resp)
|
||||
}
|
||||
loseContact := func(ctx context.Context) {
|
||||
// loseContact signal is only useful when the current xds client has received ADS response before,
|
||||
// and has not been closed by later xds client.
|
||||
if haveGotADS {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
x.loseContact(ctx)
|
||||
}
|
||||
}
|
||||
exitCleanup := func() {
|
||||
// Each xds client is responsible to close its predecessor if there's one. There are two paths
|
||||
// for a xds client to close its predecessor:
|
||||
// 1. Once it receives its first ADS response.
|
||||
// 2. It hasn't received its first ADS response yet, but its own successor has received ADS
|
||||
// response (which triggers the exit of it). Therefore, it needs to close its predecessor if
|
||||
// it has one.
|
||||
// Here the exitCleanup is for the 2nd path.
|
||||
if !haveGotADS && prevClient != nil {
|
||||
prevClient.close()
|
||||
}
|
||||
}
|
||||
// TODO: handle cfg.LrsLoadReportingServerName and remove log.
|
||||
if u.LrsLoadReportingServerName != "" {
|
||||
grpclog.Warningf("xds: lrsLoadReportingServerName is not empty, but is not handled")
|
||||
}
|
||||
|
||||
// TODO: stop using u.BalancerName. The value should come from bootstrap
|
||||
// file. It's only used in tests now.
|
||||
x.client = newXDSClientWrapper(u.BalancerName, u.EDSServiceName, x.buildOpts, u.LrsLoadReportingServerName, x.loadStore, newADS, loseContact, exitCleanup)
|
||||
}
|
||||
|
||||
// run gets executed in a goroutine once edsBalancer is created. It monitors updates from grpc,
|
||||
// xdsClient and load balancer. It synchronizes the operations that happen inside edsBalancer. It
|
||||
// exits when edsBalancer is closed.
|
||||
@ -252,44 +188,39 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
x.client.handleUpdate(cfg, u.ResolverState.Attributes)
|
||||
|
||||
if x.config == nil {
|
||||
// If this is the first config, the edsBalancer is in startup stage.
|
||||
// We need to apply the startup timeout for the first xdsClient, in
|
||||
// case it doesn't get a response from the traffic director in time.
|
||||
if x.startup {
|
||||
x.startFallbackMonitoring()
|
||||
}
|
||||
x.config = cfg
|
||||
x.fallbackInitData = &resolver.State{
|
||||
Addresses: u.ResolverState.Addresses,
|
||||
// TODO(yuxuanli): get the fallback balancer config once the validation change completes, where
|
||||
// we can pass along the config struct.
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// We will update the xdsLB with the new child policy, if we got a
|
||||
// different one.
|
||||
if x.xdsLB != nil && !reflect.DeepEqual(cfg.ChildPolicy, x.config.ChildPolicy) {
|
||||
if cfg.ChildPolicy != nil {
|
||||
x.xdsLB.HandleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config)
|
||||
} else {
|
||||
x.xdsLB.HandleChildPolicy(roundrobin.Name, nil)
|
||||
}
|
||||
}
|
||||
|
||||
var fallbackChanged bool
|
||||
// service config has been updated.
|
||||
if !reflect.DeepEqual(cfg, x.config) {
|
||||
if x.config == nil {
|
||||
// The first time we get config, we just need to start the xdsClient.
|
||||
x.startNewXDSClient(cfg)
|
||||
x.config = cfg
|
||||
x.fallbackInitData = &resolver.State{
|
||||
Addresses: u.ResolverState.Addresses,
|
||||
// TODO(yuxuanli): get the fallback balancer config once the validation change completes, where
|
||||
// we can pass along the config struct.
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Create a different xds_client if part of the config is different.
|
||||
//
|
||||
// TODO: this and all client related code, including comparing new
|
||||
// config with old, creating new client, should be moved to a
|
||||
// dedicated struct, and handled together.
|
||||
if cfg.BalancerName != x.config.BalancerName || cfg.EDSServiceName != x.config.EDSServiceName || cfg.LrsLoadReportingServerName != x.config.LrsLoadReportingServerName {
|
||||
x.startNewXDSClient(cfg)
|
||||
}
|
||||
// We will update the xdsLB with the new child policy, if we got a
|
||||
// different one.
|
||||
if x.xdsLB != nil && !reflect.DeepEqual(cfg.ChildPolicy, x.config.ChildPolicy) {
|
||||
if cfg.ChildPolicy != nil {
|
||||
x.xdsLB.HandleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config)
|
||||
} else {
|
||||
x.xdsLB.HandleChildPolicy("round_robin", cfg.ChildPolicy.Config)
|
||||
}
|
||||
}
|
||||
|
||||
if x.fallbackLB != nil && !reflect.DeepEqual(cfg.FallBackPolicy, x.config.FallBackPolicy) {
|
||||
x.fallbackLB.Close()
|
||||
x.buildFallBackBalancer(cfg)
|
||||
fallbackChanged = true
|
||||
}
|
||||
if x.fallbackLB != nil && !reflect.DeepEqual(cfg.FallBackPolicy, x.config.FallBackPolicy) {
|
||||
x.fallbackLB.Close()
|
||||
x.buildFallBackBalancer(cfg)
|
||||
fallbackChanged = true
|
||||
}
|
||||
|
||||
if x.fallbackLB != nil && (!reflect.DeepEqual(x.fallbackInitData.Addresses, u.ResolverState.Addresses) || fallbackChanged) {
|
||||
@ -312,20 +243,12 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
|
||||
|
||||
func (x *edsBalancer) handleXDSClientUpdate(update interface{}) {
|
||||
switch u := update.(type) {
|
||||
case *edsResp:
|
||||
select {
|
||||
case <-u.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
// TODO: this func should accept (*xdsclient.EDSUpdate, error), and process
|
||||
// the error, instead of having a separate loseContact signal.
|
||||
case *xdsclient.EDSUpdate:
|
||||
x.cancelFallbackAndSwitchEDSBalancerIfNecessary()
|
||||
x.xdsLB.HandleEDSResponse(u.resp)
|
||||
x.xdsLB.HandleEDSResponse(u)
|
||||
case *loseContact:
|
||||
select {
|
||||
case <-u.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
// if we are already doing fallback monitoring, then we ignore new loseContact signal.
|
||||
if x.inFallbackMonitor {
|
||||
return
|
||||
@ -418,30 +341,25 @@ func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type edsResp struct {
|
||||
ctx context.Context
|
||||
resp *xdsclient.EDSUpdate
|
||||
}
|
||||
|
||||
func (x *edsBalancer) newADSResponse(ctx context.Context, resp *xdsclient.EDSUpdate) error {
|
||||
func (x *edsBalancer) handleEDSUpdate(resp *xdsclient.EDSUpdate) error {
|
||||
// TODO: this function should take (resp, error), and send them together on
|
||||
// the channel. There doesn't need to be a separate `loseContact` function.
|
||||
select {
|
||||
case x.xdsClientUpdate <- &edsResp{ctx: ctx, resp: resp}:
|
||||
case x.xdsClientUpdate <- resp:
|
||||
case <-x.ctx.Done():
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type loseContact struct {
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (x *edsBalancer) loseContact(ctx context.Context) {
|
||||
// TODO: delete loseContact when handleEDSUpdate takes (resp, error).
|
||||
func (x *edsBalancer) loseContact() {
|
||||
select {
|
||||
case x.xdsClientUpdate <- &loseContact{ctx: ctx}:
|
||||
case x.xdsClientUpdate <- &loseContact{}:
|
||||
case <-x.ctx.Done():
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -19,8 +19,6 @@
|
||||
package balancer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
@ -36,11 +34,14 @@ import (
|
||||
wrpb "github.com/golang/protobuf/ptypes/wrappers"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/attributes"
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/status"
|
||||
xdsinternal "google.golang.org/grpc/xds/internal"
|
||||
xdsclient "google.golang.org/grpc/xds/internal/client"
|
||||
"google.golang.org/grpc/xds/internal/client/bootstrap"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -107,6 +108,8 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
// TODO: remove all usage of testTrafficDirector, and use fakexds server
|
||||
// instead.
|
||||
type testTrafficDirector struct {
|
||||
reqChan chan *request
|
||||
respChan chan *response
|
||||
@ -123,26 +126,22 @@ type response struct {
|
||||
}
|
||||
|
||||
func (ttd *testTrafficDirector) StreamAggregatedResources(s xdsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
|
||||
for {
|
||||
req, err := s.Recv()
|
||||
if err != nil {
|
||||
go func() {
|
||||
for {
|
||||
req, err := s.Recv()
|
||||
if err != nil {
|
||||
ttd.reqChan <- &request{
|
||||
req: nil,
|
||||
err: err,
|
||||
}
|
||||
return
|
||||
}
|
||||
ttd.reqChan <- &request{
|
||||
req: nil,
|
||||
err: err,
|
||||
req: req,
|
||||
err: nil,
|
||||
}
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
ttd.reqChan <- &request{
|
||||
req: req,
|
||||
err: nil,
|
||||
}
|
||||
if req.TypeUrl == edsType {
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
@ -237,12 +236,17 @@ func testXdsClientResponseHandling(t *testing.T, test *testConfig) {
|
||||
addr, td, _, cleanup := setupServer(t)
|
||||
defer cleanup()
|
||||
adsChan := make(chan *xdsclient.EDSUpdate, 10)
|
||||
newADS := func(ctx context.Context, i *xdsclient.EDSUpdate) error {
|
||||
newADS := func(i *xdsclient.EDSUpdate) error {
|
||||
adsChan <- i
|
||||
return nil
|
||||
}
|
||||
client := newXDSClientWrapper(addr, test.edsServiceName, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, "", nil, newADS, func(context.Context) {}, func() {})
|
||||
client := newXDSClientWrapper(newADS, func() {}, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
|
||||
defer client.close()
|
||||
client.handleUpdate(&XDSConfig{
|
||||
BalancerName: addr,
|
||||
EDSServiceName: test.edsServiceName,
|
||||
LrsLoadReportingServerName: "",
|
||||
}, nil)
|
||||
|
||||
for _, expectedReq := range test.expectedRequests {
|
||||
req := td.getReq()
|
||||
@ -266,3 +270,174 @@ func testXdsClientResponseHandling(t *testing.T, test *testConfig) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test that if xds_client is in attributes, the xdsclientnew function will not
|
||||
// be called, and the xds_client from attributes will be used.
|
||||
//
|
||||
// And also that when xds_client in attributes is updated, the new one will be
|
||||
// used, and watch will be restarted.
|
||||
func (s) TestXdsClientInAttributes(t *testing.T) {
|
||||
adsChan := make(chan *xdsclient.EDSUpdate, 10)
|
||||
newADS := func(i *xdsclient.EDSUpdate) error {
|
||||
adsChan <- i
|
||||
return nil
|
||||
}
|
||||
|
||||
oldxdsclientNew := xdsclientNew
|
||||
xdsclientNew = func(opts xdsclient.Options) (clientInterface xdsClientInterface, e error) {
|
||||
t.Fatalf("unexpected call to xdsclientNew when xds_client is set in attributes")
|
||||
return nil, nil
|
||||
}
|
||||
defer func() { xdsclientNew = oldxdsclientNew }()
|
||||
|
||||
addr, td, _, cleanup := setupServer(t)
|
||||
defer cleanup()
|
||||
// Create a client to be passed in attributes.
|
||||
c, _ := oldxdsclientNew(xdsclient.Options{
|
||||
Config: bootstrap.Config{
|
||||
BalancerName: addr,
|
||||
Creds: grpc.WithInsecure(),
|
||||
NodeProto: &corepb.Node{},
|
||||
},
|
||||
})
|
||||
// Need to manually close c because xdsclientWrapper won't close it (it's
|
||||
// from attributes).
|
||||
defer c.Close()
|
||||
|
||||
client := newXDSClientWrapper(newADS, func() {}, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
|
||||
defer client.close()
|
||||
|
||||
client.handleUpdate(
|
||||
&XDSConfig{EDSServiceName: testEDSClusterName, LrsLoadReportingServerName: ""},
|
||||
attributes.New(xdsinternal.XDSClientID, c),
|
||||
)
|
||||
|
||||
expectedReq := &xdspb.DiscoveryRequest{
|
||||
TypeUrl: edsType,
|
||||
ResourceNames: []string{testEDSClusterName},
|
||||
Node: &corepb.Node{},
|
||||
}
|
||||
|
||||
// Make sure the requests are sent to the correct td.
|
||||
req := td.getReq()
|
||||
if req.err != nil {
|
||||
t.Fatalf("ads RPC failed with err: %v", req.err)
|
||||
}
|
||||
if !proto.Equal(req.req, expectedReq) {
|
||||
t.Fatalf("got ADS request %T, expected: %T, diff: %s", req.req, expectedReq, cmp.Diff(req.req, expectedReq, cmp.Comparer(proto.Equal)))
|
||||
}
|
||||
|
||||
addr2, td2, _, cleanup2 := setupServer(t)
|
||||
defer cleanup2()
|
||||
// Create a client to be passed in attributes.
|
||||
c2, _ := oldxdsclientNew(xdsclient.Options{
|
||||
Config: bootstrap.Config{
|
||||
BalancerName: addr2,
|
||||
Creds: grpc.WithInsecure(),
|
||||
NodeProto: &corepb.Node{},
|
||||
},
|
||||
})
|
||||
// Need to manually close c because xdsclientWrapper won't close it (it's
|
||||
// from attributes).
|
||||
defer c2.Close()
|
||||
|
||||
// Update with a new xds_client in attributes.
|
||||
client.handleUpdate(
|
||||
&XDSConfig{EDSServiceName: "", LrsLoadReportingServerName: ""},
|
||||
attributes.New(xdsinternal.XDSClientID, c2),
|
||||
)
|
||||
|
||||
expectedReq2 := &xdspb.DiscoveryRequest{
|
||||
TypeUrl: edsType,
|
||||
// The edsServiceName in new update is an empty string, user's dial
|
||||
// target should be used as eds name to watch.
|
||||
ResourceNames: []string{testServiceName},
|
||||
Node: &corepb.Node{},
|
||||
}
|
||||
|
||||
// Make sure the requests are sent to the correct td.
|
||||
req2 := td2.getReq()
|
||||
if req.err != nil {
|
||||
t.Fatalf("ads RPC failed with err: %v", req.err)
|
||||
}
|
||||
if !proto.Equal(req2.req, expectedReq2) {
|
||||
t.Fatalf("got ADS request %T, expected: %T, diff: %s", req2.req, expectedReq, cmp.Diff(req2.req, expectedReq2, cmp.Comparer(proto.Equal)))
|
||||
}
|
||||
}
|
||||
|
||||
// Test that when edsServiceName from service config is updated, the new one
|
||||
// will be watched.
|
||||
func (s) TestEDSServiceNameUpdate(t *testing.T) {
|
||||
adsChan := make(chan *xdsclient.EDSUpdate, 10)
|
||||
newADS := func(i *xdsclient.EDSUpdate) error {
|
||||
adsChan <- i
|
||||
return nil
|
||||
}
|
||||
|
||||
oldxdsclientNew := xdsclientNew
|
||||
xdsclientNew = func(opts xdsclient.Options) (clientInterface xdsClientInterface, e error) {
|
||||
t.Fatalf("unexpected call to xdsclientNew when xds_client is set in attributes")
|
||||
return nil, nil
|
||||
}
|
||||
defer func() { xdsclientNew = oldxdsclientNew }()
|
||||
|
||||
addr, td, _, cleanup := setupServer(t)
|
||||
defer cleanup()
|
||||
// Create a client to be passed in attributes.
|
||||
c, _ := oldxdsclientNew(xdsclient.Options{
|
||||
Config: bootstrap.Config{
|
||||
BalancerName: addr,
|
||||
Creds: grpc.WithInsecure(),
|
||||
NodeProto: &corepb.Node{},
|
||||
},
|
||||
})
|
||||
// Need to manually close c because xdsclientWrapper won't close it (it's
|
||||
// from attributes).
|
||||
defer c.Close()
|
||||
|
||||
client := newXDSClientWrapper(newADS, func() {}, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
|
||||
defer client.close()
|
||||
|
||||
client.handleUpdate(
|
||||
&XDSConfig{EDSServiceName: testEDSClusterName, LrsLoadReportingServerName: ""},
|
||||
attributes.New(xdsinternal.XDSClientID, c),
|
||||
)
|
||||
|
||||
expectedReq := &xdspb.DiscoveryRequest{
|
||||
TypeUrl: edsType,
|
||||
ResourceNames: []string{testEDSClusterName},
|
||||
Node: &corepb.Node{},
|
||||
}
|
||||
|
||||
// Make sure the requests are sent to the correct td.
|
||||
req := td.getReq()
|
||||
if req.err != nil {
|
||||
t.Fatalf("ads RPC failed with err: %v", req.err)
|
||||
}
|
||||
if !proto.Equal(req.req, expectedReq) {
|
||||
t.Fatalf("got ADS request %T, expected: %T, diff: %s", req.req, expectedReq, cmp.Diff(req.req, expectedReq, cmp.Comparer(proto.Equal)))
|
||||
}
|
||||
|
||||
// Update with a new edsServiceName.
|
||||
client.handleUpdate(
|
||||
&XDSConfig{EDSServiceName: "", LrsLoadReportingServerName: ""},
|
||||
attributes.New(xdsinternal.XDSClientID, c),
|
||||
)
|
||||
|
||||
expectedReq2 := &xdspb.DiscoveryRequest{
|
||||
TypeUrl: edsType,
|
||||
// The edsServiceName in new update is an empty string, user's dial
|
||||
// target should be used as eds name to watch.
|
||||
ResourceNames: []string{testServiceName},
|
||||
Node: &corepb.Node{},
|
||||
}
|
||||
|
||||
// Make sure the requests are sent to the correct td.
|
||||
req2 := td.getReq()
|
||||
if req.err != nil {
|
||||
t.Fatalf("ads RPC failed with err: %v", req.err)
|
||||
}
|
||||
if !proto.Equal(req2.req, expectedReq2) {
|
||||
t.Fatalf("got ADS request %T, expected: %T, diff: %s", req2.req, expectedReq, cmp.Diff(req2.req, expectedReq2, cmp.Comparer(proto.Equal)))
|
||||
}
|
||||
}
|
||||
|
@ -19,11 +19,11 @@
|
||||
package balancer
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/attributes"
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
xdsinternal "google.golang.org/grpc/xds/internal"
|
||||
"google.golang.org/grpc/xds/internal/balancer/lrs"
|
||||
xdsclient "google.golang.org/grpc/xds/internal/client"
|
||||
"google.golang.org/grpc/xds/internal/client/bootstrap"
|
||||
@ -44,88 +44,224 @@ var (
|
||||
bootstrapConfigNew = bootstrap.NewConfig
|
||||
)
|
||||
|
||||
// xdsclientWpapper is responsible for getting the xds client from attributes or
|
||||
// xdsclientWrapper is responsible for getting the xds client from attributes or
|
||||
// creating a new xds client, and start watching EDS. The given callbacks will
|
||||
// be called with EDS updates or errors.
|
||||
type xdsclientWrapper struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
cancelWatch func()
|
||||
cancelLoadReport func()
|
||||
cleanup func()
|
||||
newEDSUpdate func(*xdsclient.EDSUpdate) error
|
||||
loseContact func()
|
||||
bbo balancer.BuildOptions
|
||||
loadStore lrs.Store
|
||||
|
||||
balancerName string
|
||||
// xdsclient could come from attributes, or created with balancerName.
|
||||
xdsclient xdsClientInterface
|
||||
|
||||
// edsServiceName is the edsServiceName currently being watched, not
|
||||
// necessary the edsServiceName from service config.
|
||||
//
|
||||
// If edsServiceName from service config is an empty, this will be user's
|
||||
// dial target (because that's what we use to watch EDS).
|
||||
//
|
||||
// TODO: remove the empty string related behavior, when we switch to always
|
||||
// do CDS.
|
||||
edsServiceName string
|
||||
cancelEDSWatch func()
|
||||
loadReportServer string
|
||||
cancelLoadReport func()
|
||||
}
|
||||
|
||||
func (c *xdsclientWrapper) close() {
|
||||
if c.cancelLoadReport != nil {
|
||||
c.cancelLoadReport()
|
||||
// newXDSClientWrapper creates an empty xds_client wrapper that does nothing. It
|
||||
// can accept xds_client configs, to new/switch xds_client to use.
|
||||
//
|
||||
// The given callbacks won't be called until the underlying xds_client is
|
||||
// working and sends updates.
|
||||
func newXDSClientWrapper(newEDSUpdate func(*xdsclient.EDSUpdate) error, loseContact func(), bbo balancer.BuildOptions, loadStore lrs.Store) *xdsclientWrapper {
|
||||
return &xdsclientWrapper{
|
||||
newEDSUpdate: newEDSUpdate,
|
||||
loseContact: loseContact,
|
||||
bbo: bbo,
|
||||
loadStore: loadStore,
|
||||
}
|
||||
if c.cancelWatch != nil {
|
||||
c.cancelWatch()
|
||||
}
|
||||
if c.xdsclient != nil {
|
||||
// TODO: this shouldn't close xdsclient if it's from attributes.
|
||||
c.xdsclient.Close()
|
||||
}
|
||||
c.cleanup()
|
||||
}
|
||||
|
||||
func newXDSClientWrapper(balancerName string, edsServiceName string, bbo balancer.BuildOptions, loadReportServer string, loadStore lrs.Store, newADS func(context.Context, *xdsclient.EDSUpdate) error, loseContact func(ctx context.Context), exitCleanup func()) *xdsclientWrapper {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ret := &xdsclientWrapper{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
cleanup: exitCleanup,
|
||||
// replaceXDSClient replaces xdsclient fields to the newClient if they are
|
||||
// different. If xdsclient is replaced, the balancerName field will also be
|
||||
// updated to newBalancerName.
|
||||
//
|
||||
// If the old xdsclient is replaced, and was created locally (not from
|
||||
// attributes), it will be closed.
|
||||
//
|
||||
// It returns whether xdsclient is replaced.
|
||||
func (c *xdsclientWrapper) replaceXDSClient(newClient xdsClientInterface, newBalancerName string) bool {
|
||||
if c.xdsclient == newClient {
|
||||
return false
|
||||
}
|
||||
oldClient := c.xdsclient
|
||||
oldBalancerName := c.balancerName
|
||||
c.xdsclient = newClient
|
||||
c.balancerName = newBalancerName
|
||||
if oldBalancerName != "" {
|
||||
// OldBalancerName!="" means if the old client was not from attributes.
|
||||
oldClient.Close()
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// updateXDSClient sets xdsclient in wrapper to the correct one based on the
|
||||
// attributes and service config.
|
||||
//
|
||||
// If client is found in attributes, it will be used, but we also need to decide
|
||||
// whether to close the old client.
|
||||
// - if old client was created locally (balancerName is not ""), close it and
|
||||
// replace it
|
||||
// - if old client was from previous attributes, only replace it, but don't
|
||||
// close it
|
||||
//
|
||||
// If client is not found in attributes, will need to create a new one only if
|
||||
// the balancerName (from bootstrap file or from service config) changed.
|
||||
// - if balancer names are the same, do nothing, and return false
|
||||
// - if balancer names are different, create new one, and return true
|
||||
func (c *xdsclientWrapper) updateXDSClient(config *XDSConfig, attr *attributes.Attributes) bool {
|
||||
if attr != nil {
|
||||
if clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface); clientFromAttr != nil {
|
||||
// This will also clear balancerName, to indicate that client is
|
||||
// from attributes.
|
||||
return c.replaceXDSClient(clientFromAttr, "")
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: get xdsclient from Attributes instead of creating a new one.
|
||||
|
||||
config := bootstrapConfigNew()
|
||||
if config.BalancerName == "" {
|
||||
config.BalancerName = balancerName
|
||||
clientConfig := bootstrapConfigNew()
|
||||
if clientConfig.BalancerName == "" {
|
||||
clientConfig.BalancerName = config.BalancerName
|
||||
}
|
||||
if config.Creds == nil {
|
||||
|
||||
if c.balancerName == clientConfig.BalancerName {
|
||||
return false
|
||||
}
|
||||
|
||||
if clientConfig.Creds == nil {
|
||||
// TODO: Once we start supporting a mechanism to register credential
|
||||
// types, a failure to find the credential type mentioned in the
|
||||
// bootstrap file should result in a failure, and not in using
|
||||
// credentials from the parent channel (passed through the
|
||||
// resolver.BuildOptions).
|
||||
config.Creds = defaultDialCreds(config.BalancerName, bbo)
|
||||
clientConfig.Creds = defaultDialCreds(clientConfig.BalancerName, c.bbo)
|
||||
}
|
||||
|
||||
var dopts []grpc.DialOption
|
||||
if bbo.Dialer != nil {
|
||||
dopts = []grpc.DialOption{grpc.WithContextDialer(bbo.Dialer)}
|
||||
if dialer := c.bbo.Dialer; dialer != nil {
|
||||
dopts = []grpc.DialOption{grpc.WithContextDialer(dialer)}
|
||||
}
|
||||
|
||||
c, err := xdsclientNew(xdsclient.Options{Config: *config, DialOpts: dopts})
|
||||
newClient, err := xdsclientNew(xdsclient.Options{Config: *clientConfig, DialOpts: dopts})
|
||||
if err != nil {
|
||||
grpclog.Warningf("failed to create xdsclient, error: %v", err)
|
||||
return ret
|
||||
// This should never fail. xdsclientnew does a non-blocking dial, and
|
||||
// all the config passed in should be validated.
|
||||
//
|
||||
// This could leave c.xdsclient as nil if this is the first update.
|
||||
grpclog.Warningf("eds: failed to create xdsclient, error: %v", err)
|
||||
return false
|
||||
}
|
||||
ret.xdsclient = c
|
||||
return c.replaceXDSClient(newClient, clientConfig.BalancerName)
|
||||
}
|
||||
|
||||
// The clusterName to watch should come from CDS response, via service
|
||||
// config. If it's an empty string, fallback user's dial target.
|
||||
nameToWatch := edsServiceName
|
||||
if nameToWatch == "" {
|
||||
grpclog.Warningf("eds: cluster name to watch is an empty string. Fallback to user's dial target")
|
||||
nameToWatch = bbo.Target.Endpoint
|
||||
// startEDSWatch starts the EDS watch. Caller can call this when the xds_client
|
||||
// is updated, or the edsServiceName is updated.
|
||||
//
|
||||
// Note that if there's already a watch in progress, it's not explicitly
|
||||
// canceled. Because for each xds_client, there should be only one EDS watch in
|
||||
// progress. So a new EDS watch implicitly cancels the previous one.
|
||||
//
|
||||
// This usually means load report needs to be restarted, but this function does
|
||||
// NOT do that. Caller needs to call startLoadReport separately.
|
||||
func (c *xdsclientWrapper) startEDSWatch(nameToWatch string) {
|
||||
if c.xdsclient == nil {
|
||||
grpclog.Warningf("xds: xdsclient is nil when trying to start an EDS watch. This means xdsclient wasn't passed in from the resolver, and xdsclient.New failed")
|
||||
return
|
||||
}
|
||||
ret.cancelWatch = ret.xdsclient.WatchEDS(nameToWatch, func(update *xdsclient.EDSUpdate, err error) {
|
||||
if err != nil {
|
||||
loseContact(ret.ctx)
|
||||
return
|
||||
}
|
||||
if err := newADS(ret.ctx, update); err != nil {
|
||||
|
||||
c.edsServiceName = nameToWatch
|
||||
c.cancelEDSWatch = c.xdsclient.WatchEDS(c.edsServiceName, func(update *xdsclient.EDSUpdate, err error) {
|
||||
// TODO: this should trigger a call to `c.loseContact`, when the error
|
||||
// indicates "lose contact".
|
||||
if err := c.newEDSUpdate(update); err != nil {
|
||||
grpclog.Warningf("xds: processing new EDS update failed due to %v.", err)
|
||||
}
|
||||
})
|
||||
if loadStore != nil {
|
||||
ret.cancelLoadReport = ret.xdsclient.ReportLoad(loadReportServer, nameToWatch, loadStore)
|
||||
}
|
||||
|
||||
// startLoadReport starts load reporting. If there's already a load reporting in
|
||||
// progress, it cancels that.
|
||||
//
|
||||
// Caller can cal this when the loadReportServer name changes, but
|
||||
// edsServiceName doesn't (so we only need to restart load reporting, not EDS
|
||||
// watch).
|
||||
func (c *xdsclientWrapper) startLoadReport(edsServiceNameBeingWatched string, loadReportServer string) {
|
||||
if c.xdsclient == nil {
|
||||
grpclog.Warningf("xds: xdsclient is nil when trying to start load reporting. This means xdsclient wasn't passed in from the resolver, and xdsclient.New failed")
|
||||
return
|
||||
}
|
||||
if c.loadStore != nil {
|
||||
if c.cancelLoadReport != nil {
|
||||
c.cancelLoadReport()
|
||||
}
|
||||
c.loadReportServer = loadReportServer
|
||||
c.cancelLoadReport = c.xdsclient.ReportLoad(c.loadReportServer, edsServiceNameBeingWatched, c.loadStore)
|
||||
}
|
||||
}
|
||||
|
||||
// handleUpdate applies the service config and attributes updates to the client,
|
||||
// including updating the xds_client to use, and updating the EDS name to watch.
|
||||
func (c *xdsclientWrapper) handleUpdate(config *XDSConfig, attr *attributes.Attributes) {
|
||||
clientChanged := c.updateXDSClient(config, attr)
|
||||
|
||||
var (
|
||||
restartWatchEDS bool
|
||||
restartLoadReport bool
|
||||
)
|
||||
|
||||
// The clusterName to watch should come from CDS response, via service
|
||||
// config. If it's an empty string, fallback user's dial target.
|
||||
nameToWatch := config.EDSServiceName
|
||||
if nameToWatch == "" {
|
||||
grpclog.Warningf("eds: cluster name to watch is an empty string. Fallback to user's dial target")
|
||||
nameToWatch = c.bbo.Target.Endpoint
|
||||
}
|
||||
|
||||
// Need to restart EDS watch when one of the following happens:
|
||||
// - the xds_client is updated
|
||||
// - the xds_client didn't change, but the edsServiceName changed
|
||||
//
|
||||
// Only need to restart load reporting when:
|
||||
// - no need to restart EDS, but loadReportServer name changed
|
||||
if clientChanged || c.edsServiceName != nameToWatch {
|
||||
restartWatchEDS = true
|
||||
restartLoadReport = true
|
||||
} else if c.loadReportServer != config.LrsLoadReportingServerName {
|
||||
restartLoadReport = true
|
||||
}
|
||||
|
||||
if restartWatchEDS {
|
||||
c.startEDSWatch(nameToWatch)
|
||||
}
|
||||
|
||||
if restartLoadReport {
|
||||
c.startLoadReport(nameToWatch, config.LrsLoadReportingServerName)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *xdsclientWrapper) close() {
|
||||
if c.xdsclient != nil && c.balancerName != "" {
|
||||
// Only close xdsclient if it's not from attributes.
|
||||
c.xdsclient.Close()
|
||||
}
|
||||
|
||||
if c.cancelLoadReport != nil {
|
||||
c.cancelLoadReport()
|
||||
}
|
||||
if c.cancelEDSWatch != nil {
|
||||
c.cancelEDSWatch()
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
// defaultDialCreds builds a DialOption containing the credentials to be used
|
||||
|
@ -21,7 +21,6 @@ package balancer
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
@ -264,7 +263,7 @@ func (c *fakeXDSClient) WatchEDS(clusterName string, edsCb func(*xdsclient.EDSUp
|
||||
}
|
||||
|
||||
func (c *fakeXDSClient) callEDSCallback(u *xdsclient.EDSUpdate, err error) {
|
||||
t := time.NewTimer(time.Second)
|
||||
t := time.NewTimer(1 * time.Second)
|
||||
select {
|
||||
case <-c.edsCbReceived:
|
||||
t.Stop()
|
||||
@ -529,7 +528,7 @@ func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) {
|
||||
defer lb.Close()
|
||||
|
||||
cfg := XDSConfig{
|
||||
BalancerName: "",
|
||||
BalancerName: "wrong-balancer-name",
|
||||
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
|
||||
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
|
||||
}
|
||||
@ -558,8 +557,10 @@ func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) {
|
||||
t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s")
|
||||
}
|
||||
|
||||
// Callback with an error, the balancer should switch to fallback.
|
||||
testXDSClient.callEDSCallback(nil, fmt.Errorf("xds client error"))
|
||||
// Call loseContact explicitly, error in EDS callback is not handled.
|
||||
// Eventually, this should call EDS ballback with an error that indicates
|
||||
// "lost contact".
|
||||
lb.loseContact()
|
||||
|
||||
// verify fallback balancer B takes over
|
||||
select {
|
||||
@ -614,7 +615,7 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) {
|
||||
defer lb.Close()
|
||||
|
||||
cfg := &XDSConfig{
|
||||
BalancerName: "",
|
||||
BalancerName: "wrong-balancer-name",
|
||||
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
|
||||
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
|
||||
EDSServiceName: testEDSClusterName,
|
||||
@ -653,8 +654,10 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) {
|
||||
// lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record
|
||||
// to make sure there's a new one created and get the pointer to it.
|
||||
lbABuilder.clearLastBalancer()
|
||||
// Callback with an error, the balancer should switch to fallback.
|
||||
testXDSClient.callEDSCallback(nil, fmt.Errorf("xds client error"))
|
||||
// Call loseContact explicitly, error in EDS callback is not handled.
|
||||
// Eventually, this should call EDS ballback with an error that indicates
|
||||
// "lost contact".
|
||||
lb.loseContact()
|
||||
|
||||
// switch to fallback
|
||||
// fallback balancer A takes over
|
||||
@ -703,7 +706,7 @@ func (s) TestXdsBalancerFallBackSignalFromEdsBalancer(t *testing.T) {
|
||||
defer lb.Close()
|
||||
|
||||
cfg := &XDSConfig{
|
||||
BalancerName: "",
|
||||
BalancerName: "wrong-balancer-name",
|
||||
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
|
||||
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
|
||||
}
|
||||
@ -741,8 +744,10 @@ func (s) TestXdsBalancerFallBackSignalFromEdsBalancer(t *testing.T) {
|
||||
// lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record
|
||||
// to make sure there's a new one created and get the pointer to it.
|
||||
lbABuilder.clearLastBalancer()
|
||||
// Callback with an error, the balancer should switch to fallback.
|
||||
testXDSClient.callEDSCallback(nil, fmt.Errorf("xds client error"))
|
||||
// Call loseContact explicitly, error in EDS callback is not handled.
|
||||
// Eventually, this should call EDS ballback with an error that indicates
|
||||
// "lost contact".
|
||||
lb.loseContact()
|
||||
|
||||
// switch to fallback
|
||||
// fallback balancer A takes over
|
||||
|
Reference in New Issue
Block a user