xds: add new fields to XDSConfig (#3100)

Fields are added in: https://github.com/grpc/grpc-proto/pull/64

Other changes:
 - Move XDSConfig from internal to balancer
   - Later we will add a separate config for CDS balancer
 - generate service_config.pb.go and test with json generated from proto message
This commit is contained in:
Menghan Li
2019-11-05 09:17:10 -08:00
committed by GitHub
parent bbd4b7ae28
commit b09352fc84
11 changed files with 1530 additions and 174 deletions

View File

@ -0,0 +1,59 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//go:generate ./regenerate.sh
package grpc_service_config_test
import (
"testing"
"github.com/golang/protobuf/jsonpb"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
scpb "google.golang.org/grpc/internal/proto/grpc_service_config"
)
// TestXdsConfigMarshalToJSON is an example to print json format of xds_config.
func TestXdsConfigMarshalToJSON(t *testing.T) {
c := &scpb.XdsConfig{
ChildPolicy: []*scpb.LoadBalancingConfig{
{Policy: &scpb.LoadBalancingConfig_Grpclb{
Grpclb: &scpb.GrpcLbConfig{},
}},
{Policy: &scpb.LoadBalancingConfig_RoundRobin{
RoundRobin: &scpb.RoundRobinConfig{},
}},
},
FallbackPolicy: []*scpb.LoadBalancingConfig{
{Policy: &scpb.LoadBalancingConfig_Grpclb{
Grpclb: &scpb.GrpcLbConfig{},
}},
{Policy: &scpb.LoadBalancingConfig_PickFirst{
PickFirst: &scpb.PickFirstConfig{},
}},
},
EdsServiceName: "eds.service.name",
LrsLoadReportingServerName: &wrapperspb.StringValue{
Value: "lrs.server.name",
},
}
j, err := (&jsonpb.Marshaler{}).MarshalToString(c)
if err != nil {
t.Fatalf("failed to marshal proto to json: %v", err)
}
t.Logf(j)
}

View File

@ -0,0 +1,35 @@
#!/bin/bash
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -eux -o pipefail
TMP=$(mktemp -d)
function finish {
rm -rf "$TMP"
}
trap finish EXIT
pushd "$TMP"
mkdir -p grpc/service_config
curl https://raw.githubusercontent.com/grpc/grpc-proto/master/grpc/service_config/service_config.proto > grpc/service_config/service_config.proto
mkdir -p google/rpc
curl https://raw.githubusercontent.com/googleapis/googleapis/master/google/rpc/code.proto > google/rpc/code.proto
protoc --go_out=plugins=grpc,paths=source_relative:. -I. grpc/service_config/*.proto
popd
rm -f ./*.pb.go
cp "$TMP"/grpc/service_config/*.pb.go ./

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,117 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package balancer
import (
"encoding/json"
"fmt"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/serviceconfig"
)
// XDSConfig represents the loadBalancingConfig section of the service config
// for xDS balancers.
type XDSConfig struct {
serviceconfig.LoadBalancingConfig
// BalancerName represents the load balancer to use.
BalancerName string
// ChildPolicy represents the load balancing config for the child
// policy.
ChildPolicy *loadBalancingConfig
// FallBackPolicy represents the load balancing config for the
// fallback.
FallBackPolicy *loadBalancingConfig
// Name to use in EDS query. If not present, defaults to the server
// name from the target URI.
EDSServiceName string
// LRS server to send load reports to. If not present, load reporting
// will be disabled. If set to the empty string, load reporting will
// be sent to the same server that we obtained CDS data from.
LrsLoadReportingServerName string
}
// xdsConfigJSON is the intermediate unmarshal result of XDSConfig. ChildPolicy
// and Fallbackspolicy are post-processed, and for each, the first installed
// policy is kept.
type xdsConfigJSON struct {
BalancerName string
ChildPolicy []*loadBalancingConfig
FallbackPolicy []*loadBalancingConfig
EDSServiceName string
LRSLoadReportingServerName string
}
// UnmarshalJSON parses the JSON-encoded byte slice in data and stores it in l.
// When unmarshalling, we iterate through the childPolicy/fallbackPolicy lists
// and select the first LB policy which has been registered.
func (l *XDSConfig) UnmarshalJSON(data []byte) error {
var configJSON xdsConfigJSON
if err := json.Unmarshal(data, &configJSON); err != nil {
return err
}
l.BalancerName = configJSON.BalancerName
l.EDSServiceName = configJSON.EDSServiceName
l.LrsLoadReportingServerName = configJSON.LRSLoadReportingServerName
for _, lbcfg := range configJSON.ChildPolicy {
if balancer.Get(lbcfg.Name) != nil {
l.ChildPolicy = lbcfg
break
}
}
for _, lbcfg := range configJSON.FallbackPolicy {
if balancer.Get(lbcfg.Name) != nil {
l.FallBackPolicy = lbcfg
break
}
}
return nil
}
// MarshalJSON returns a JSON encoding of l.
func (l *XDSConfig) MarshalJSON() ([]byte, error) {
return nil, fmt.Errorf("XDSConfig.MarshalJSON() is unimplemented")
}
// loadBalancingConfig represents a single load balancing config,
// stored in JSON format.
type loadBalancingConfig struct {
Name string
Config json.RawMessage
}
// MarshalJSON returns a JSON encoding of l.
func (l *loadBalancingConfig) MarshalJSON() ([]byte, error) {
return nil, fmt.Errorf("loadBalancingConfig.MarshalJSON() is unimplemented")
}
// UnmarshalJSON parses the JSON-encoded byte slice in data and stores it in l.
func (l *loadBalancingConfig) UnmarshalJSON(data []byte) error {
var cfg map[string]json.RawMessage
if err := json.Unmarshal(data, &cfg); err != nil {
return err
}
for name, config := range cfg {
l.Name = name
l.Config = config
}
return nil
}

View File

@ -34,7 +34,6 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
"google.golang.org/grpc/xds/internal/balancer/lrs"
)
@ -89,7 +88,7 @@ func (b *xdsBalancerBuilder) Name() string {
}
func (b *xdsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
var cfg xdsinternal.LBConfig
var cfg XDSConfig
if err := json.Unmarshal(c, &cfg); err != nil {
return nil, fmt.Errorf("unable to unmarshal balancer config %s into xds config", string(c))
}
@ -133,14 +132,14 @@ type xdsBalancer struct {
noSubConnAlert <-chan struct{}
client *client // may change when passed a different service config
config *xdsinternal.LBConfig // may change when passed a different service config
config *XDSConfig // may change when passed a different service config
xdsLB edsBalancerInterface
fallbackLB balancer.Balancer
fallbackInitData *resolver.State // may change when HandleResolved address is called
loadStore lrs.Store
}
func (x *xdsBalancer) startNewXDSClient(u *xdsinternal.LBConfig) {
func (x *xdsBalancer) startNewXDSClient(u *XDSConfig) {
// If the xdsBalancer is in startup stage, then we need to apply the startup timeout for the first
// xdsClient to get a response from the traffic director.
if x.startup {
@ -192,7 +191,7 @@ func (x *xdsBalancer) startNewXDSClient(u *xdsinternal.LBConfig) {
prevClient.close()
}
}
x.client = newXDSClient(u.BalancerName, x.buildOpts, x.loadStore, newADS, loseContact, exitCleanup)
x.client = newXDSClient(u.BalancerName, u.EDSServiceName, x.buildOpts, x.loadStore, newADS, loseContact, exitCleanup)
go x.client.run()
}
@ -239,7 +238,7 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) {
}
}
case *balancer.ClientConnState:
cfg, _ := u.BalancerConfig.(*xdsinternal.LBConfig)
cfg, _ := u.BalancerConfig.(*XDSConfig)
if cfg == nil {
// service config parsing failed. should never happen.
return
@ -248,6 +247,11 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) {
var fallbackChanged bool
// service config has been updated.
if !reflect.DeepEqual(cfg, x.config) {
// TODO: handle cfg.LrsLoadReportingServerName and remove log.
if cfg.LrsLoadReportingServerName != "" {
grpclog.Warningf("xds: lrsLoadReportingServerName is not empty, but is not handled")
}
if x.config == nil {
// The first time we get config, we just need to start the xdsClient.
x.startNewXDSClient(cfg)
@ -483,16 +487,16 @@ func (x *xdsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() {
}
}
func (x *xdsBalancer) buildFallBackBalancer(c *xdsinternal.LBConfig) {
func (x *xdsBalancer) buildFallBackBalancer(c *XDSConfig) {
if c.FallBackPolicy == nil {
x.buildFallBackBalancer(&xdsinternal.LBConfig{
FallBackPolicy: &xdsinternal.LoadBalancingConfig{
x.buildFallBackBalancer(&XDSConfig{
FallBackPolicy: &loadBalancingConfig{
Name: "round_robin",
},
})
return
}
// builder will always be non-nil, since when parse JSON into xdsinternal.LBConfig, we check whether the specified
// builder will always be non-nil, since when parse JSON into xdsinternal.XDSConfig, we check whether the specified
// balancer is registered or not.
builder := balancer.Get(c.FallBackPolicy.Name)

View File

@ -213,9 +213,9 @@ func (c *client) adsCallAttempt() (firstRespReceived bool) {
}
}
func newXDSClient(balancerName string, opts balancer.BuildOptions, loadStore lrs.Store, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client {
func newXDSClient(balancerName string, edsServiceName string, opts balancer.BuildOptions, loadStore lrs.Store, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client {
c := &client{
serviceName: opts.Target.Endpoint,
serviceName: edsServiceName,
dialer: opts.Dialer,
channelzParentID: opts.ChannelzParentID,
newADS: newADS,
@ -225,6 +225,10 @@ func newXDSClient(balancerName string, opts balancer.BuildOptions, loadStore lrs
loadStore: loadStore,
}
if c.serviceName == "" {
c.serviceName = opts.Target.Endpoint
}
c.ctx, c.cancel = context.WithCancel(context.Background())
// It is possible that NewConfig returns a Config object with certain

View File

@ -44,6 +44,7 @@ import (
var (
testServiceName = "test/foo"
testEDSServiceName = "test/service/eds"
testEDSReq = &xdspb.DiscoveryRequest{
TypeUrl: edsType,
ResourceNames: []string{testServiceName},
@ -177,6 +178,7 @@ func newTestTrafficDirector() *testTrafficDirector {
}
type testConfig struct {
edsServiceName string
expectedRequests []*xdspb.DiscoveryRequest
responsesToSend []*xdspb.DiscoveryResponse
expectedADSResponses []proto.Message
@ -208,11 +210,22 @@ func setupServer(t *testing.T) (addr string, td *testTrafficDirector, lrss *lrsS
}
func (s) TestXdsClientResponseHandling(t *testing.T) {
for _, test := range []*testConfig{{
for _, test := range []*testConfig{
{
expectedRequests: []*xdspb.DiscoveryRequest{testEDSReq},
responsesToSend: []*xdspb.DiscoveryResponse{testEDSResp},
expectedADSResponses: []proto.Message{testClusterLoadAssignment},
}} {
},
{
edsServiceName: testEDSServiceName,
expectedRequests: []*xdspb.DiscoveryRequest{{
TypeUrl: edsType,
ResourceNames: []string{testEDSServiceName},
}},
responsesToSend: []*xdspb.DiscoveryResponse{testEDSResp},
expectedADSResponses: []proto.Message{testClusterLoadAssignment},
},
} {
testXdsClientResponseHandling(t, test)
}
}
@ -225,7 +238,7 @@ func testXdsClientResponseHandling(t *testing.T, test *testConfig) {
adsChan <- i
return nil
}
client := newXDSClient(addr, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, func(context.Context) {}, func() {})
client := newXDSClient(addr, test.edsServiceName, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, func(context.Context) {}, func() {})
defer client.close()
go client.run()
@ -282,7 +295,7 @@ func testXdsClientLoseContactRemoteClose(t *testing.T, test *testConfig) {
loseContactFunc := func(context.Context) {
contactChan <- &loseContact{}
}
client := newXDSClient(addr, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, loseContactFunc, func() {})
client := newXDSClient(addr, test.edsServiceName, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, loseContactFunc, func() {})
defer client.close()
go client.run()
@ -316,7 +329,7 @@ func testXdsClientLoseContactADSRelatedErrorOccur(t *testing.T, test *testConfig
loseContactFunc := func(context.Context) {
contactChan <- &loseContact{}
}
client := newXDSClient(addr, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, loseContactFunc, func() {})
client := newXDSClient(addr, test.edsServiceName, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, loseContactFunc, func() {})
defer client.close()
go client.run()
@ -350,7 +363,7 @@ func (s) TestXdsClientExponentialRetry(t *testing.T) {
loseContactFunc := func(context.Context) {
contactChan <- &loseContact{}
}
client := newXDSClient(addr, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, loseContactFunc, func() {})
client := newXDSClient(addr, "", balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, newADS, loseContactFunc, func() {})
defer client.close()
go client.run()

View File

@ -34,7 +34,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
xdsinternal "google.golang.org/grpc/xds/internal"
)
type lrsServer struct {
@ -105,7 +104,7 @@ func (s) TestXdsLoadReporting(t *testing.T) {
Nanos: intervalNano,
}
cfg := &xdsinternal.LBConfig{
cfg := &XDSConfig{
BalancerName: addr,
}
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg})

View File

@ -19,6 +19,7 @@
package balancer
import (
"bytes"
"encoding/json"
"reflect"
"sync"
@ -26,13 +27,17 @@ import (
"time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/leakcheck"
scpb "google.golang.org/grpc/internal/proto/grpc_service_config"
"google.golang.org/grpc/resolver"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/lrs"
)
@ -61,10 +66,10 @@ const (
var (
testBalancerNameFooBar = "foo.bar"
testLBConfigFooBar = &xdsinternal.LBConfig{
testLBConfigFooBar = &XDSConfig{
BalancerName: testBalancerNameFooBar,
ChildPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerB},
FallBackPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerB},
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
}
specialAddrForBalancerA = resolver.Address{Addr: "this.is.balancer.A"}
@ -174,8 +179,8 @@ type scStateChange struct {
type fakeEDSBalancer struct {
cc balancer.ClientConn
edsChan chan *xdspb.ClusterLoadAssignment
childPolicy chan *xdsinternal.LoadBalancingConfig
fallbackPolicy chan *xdsinternal.LoadBalancingConfig
childPolicy chan *loadBalancingConfig
fallbackPolicy chan *loadBalancingConfig
subconnStateChange chan *scStateChange
loadStore lrs.Store
}
@ -195,7 +200,7 @@ func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment
}
func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) {
f.childPolicy <- &xdsinternal.LoadBalancingConfig{
f.childPolicy <- &loadBalancingConfig{
Name: name,
Config: config,
}
@ -205,8 +210,8 @@ func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancer
lb := &fakeEDSBalancer{
cc: cc,
edsChan: make(chan *xdspb.ClusterLoadAssignment, 10),
childPolicy: make(chan *xdsinternal.LoadBalancingConfig, 10),
fallbackPolicy: make(chan *xdsinternal.LoadBalancingConfig, 10),
childPolicy: make(chan *loadBalancingConfig, 10),
fallbackPolicy: make(chan *loadBalancingConfig, 10),
subconnStateChange: make(chan *scStateChange, 10),
loadStore: loadStore,
}
@ -328,10 +333,10 @@ func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) {
for i := 0; i < 2; i++ {
addr, td, _, cleanup := setupServer(t)
cleanups = append(cleanups, cleanup)
workingLBConfig := &xdsinternal.LBConfig{
workingLBConfig := &XDSConfig{
BalancerName: addr,
ChildPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
}
lb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs},
@ -383,31 +388,31 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) {
}
}()
for _, test := range []struct {
cfg *xdsinternal.LBConfig
cfg *XDSConfig
responseToSend *xdspb.DiscoveryResponse
expectedChildPolicy *xdsinternal.LoadBalancingConfig
expectedChildPolicy *loadBalancingConfig
}{
{
cfg: &xdsinternal.LBConfig{
ChildPolicy: &xdsinternal.LoadBalancingConfig{
cfg: &XDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: fakeBalancerA,
Config: json.RawMessage("{}"),
},
},
responseToSend: testEDSResp,
expectedChildPolicy: &xdsinternal.LoadBalancingConfig{
expectedChildPolicy: &loadBalancingConfig{
Name: string(fakeBalancerA),
Config: json.RawMessage(`{}`),
},
},
{
cfg: &xdsinternal.LBConfig{
ChildPolicy: &xdsinternal.LoadBalancingConfig{
cfg: &XDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: fakeBalancerB,
Config: json.RawMessage("{}"),
},
},
expectedChildPolicy: &xdsinternal.LoadBalancingConfig{
expectedChildPolicy: &loadBalancingConfig{
Name: string(fakeBalancerB),
Config: json.RawMessage(`{}`),
},
@ -461,16 +466,16 @@ func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) {
addr, td, _, cleanup := setupServer(t)
cfg := xdsinternal.LBConfig{
cfg := XDSConfig{
BalancerName: addr,
ChildPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
}
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: &cfg})
addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
cfg2 := cfg
cfg2.FallBackPolicy = &xdsinternal.LoadBalancingConfig{Name: fakeBalancerB}
cfg2.FallBackPolicy = &loadBalancingConfig{Name: fakeBalancerB}
lb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: &cfg2,
@ -502,7 +507,7 @@ func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) {
}
cfg3 := cfg
cfg3.FallBackPolicy = &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA}
cfg3.FallBackPolicy = &loadBalancingConfig{Name: fakeBalancerA}
lb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: &cfg3,
@ -536,10 +541,10 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) {
addr, td, _, cleanup := setupServer(t)
defer cleanup()
cfg := &xdsinternal.LBConfig{
cfg := &XDSConfig{
BalancerName: addr,
ChildPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
}
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg})
@ -614,10 +619,10 @@ func (s) TestXdsBalancerFallBackSignalFromEdsBalancer(t *testing.T) {
addr, td, _, cleanup := setupServer(t)
defer cleanup()
cfg := &xdsinternal.LBConfig{
cfg := &XDSConfig{
BalancerName: addr,
ChildPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
}
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg})
@ -675,43 +680,119 @@ func (s) TestXdsBalancerFallBackSignalFromEdsBalancer(t *testing.T) {
}
}
func (s) TestXdsBalancerConfigParsingSelectingLBPolicy(t *testing.T) {
js := json.RawMessage(`{
"balancerName": "fake.foo.bar",
"childPolicy": [{"fake_balancer_C": {}}, {"fake_balancer_A": {}}, {"fake_balancer_B": {}}],
"fallbackPolicy": [{"fake_balancer_C": {}}, {"fake_balancer_B": {}}, {"fake_balancer_A": {}}]
}`)
cfg, err := (&xdsBalancerBuilder{}).ParseConfig(js)
if err != nil {
t.Fatalf("unable to unmarshal balancer config into xds config: %v", err)
}
xdsCfg := cfg.(*xdsinternal.LBConfig)
wantChildPolicy := &xdsinternal.LoadBalancingConfig{Name: string(fakeBalancerA), Config: json.RawMessage(`{}`)}
if !reflect.DeepEqual(xdsCfg.ChildPolicy, wantChildPolicy) {
t.Fatalf("got child policy %v, want %v", xdsCfg.ChildPolicy, wantChildPolicy)
}
wantFallbackPolicy := &xdsinternal.LoadBalancingConfig{Name: string(fakeBalancerB), Config: json.RawMessage(`{}`)}
if !reflect.DeepEqual(xdsCfg.FallBackPolicy, wantFallbackPolicy) {
t.Fatalf("got fallback policy %v, want %v", xdsCfg.FallBackPolicy, wantFallbackPolicy)
}
func TestXdsBalancerConfigParsing(t *testing.T) {
const (
testEDSName = "eds.service"
testLRSName = "lrs.server"
)
b := bytes.NewBuffer(nil)
if err := (&jsonpb.Marshaler{}).Marshal(b, &scpb.XdsConfig{
ChildPolicy: []*scpb.LoadBalancingConfig{
{Policy: &scpb.LoadBalancingConfig_Xds{}},
{Policy: &scpb.LoadBalancingConfig_RoundRobin{
RoundRobin: &scpb.RoundRobinConfig{},
}},
},
FallbackPolicy: []*scpb.LoadBalancingConfig{
{Policy: &scpb.LoadBalancingConfig_Xds{}},
{Policy: &scpb.LoadBalancingConfig_PickFirst{
PickFirst: &scpb.PickFirstConfig{},
}},
},
EdsServiceName: testEDSName,
LrsLoadReportingServerName: &wrapperspb.StringValue{Value: testLRSName},
}); err != nil {
t.Fatalf("%v", err)
}
func (s) TestXdsLoadbalancingConfigParsing(t *testing.T) {
tests := []struct {
name string
js json.RawMessage
want serviceconfig.LoadBalancingConfig
wantErr bool
}{
{
name: "jsonpb-generated",
js: b.Bytes(),
want: &XDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: "round_robin",
Config: json.RawMessage("{}"),
},
FallBackPolicy: &loadBalancingConfig{
Name: "pick_first",
Config: json.RawMessage("{}"),
},
EDSServiceName: testEDSName,
LrsLoadReportingServerName: testLRSName,
},
wantErr: false,
},
{
// json with random balancers, and the first is not registered.
name: "manually-generated",
js: json.RawMessage(`
{
"balancerName": "fake.foo.bar",
"childPolicy": [
{"fake_balancer_C": {}},
{"fake_balancer_A": {}},
{"fake_balancer_B": {}}
],
"fallbackPolicy": [
{"fake_balancer_C": {}},
{"fake_balancer_B": {}},
{"fake_balancer_A": {}}
],
"edsServiceName": "eds.service",
"lrsLoadReportingServerName": "lrs.server"
}`),
want: &XDSConfig{
BalancerName: "fake.foo.bar",
ChildPolicy: &loadBalancingConfig{
Name: "fake_balancer_A",
Config: json.RawMessage("{}"),
},
FallBackPolicy: &loadBalancingConfig{
Name: "fake_balancer_B",
Config: json.RawMessage("{}"),
},
EDSServiceName: testEDSName,
LrsLoadReportingServerName: testLRSName,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := &xdsBalancerBuilder{}
got, err := b.ParseConfig(tt.js)
if (err != nil) != tt.wantErr {
t.Errorf("xdsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !cmp.Equal(got, tt.want) {
t.Errorf(cmp.Diff(got, tt.want))
}
})
}
}
func TestLoadbalancingConfigParsing(t *testing.T) {
tests := []struct {
name string
s string
want *xdsinternal.LBConfig
want *XDSConfig
}{
{
name: "empty",
s: "{}",
want: &xdsinternal.LBConfig{},
want: &XDSConfig{},
},
{
name: "success1",
s: `{"childPolicy":[{"pick_first":{}}]}`,
want: &xdsinternal.LBConfig{
ChildPolicy: &xdsinternal.LoadBalancingConfig{
want: &XDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: "pick_first",
Config: json.RawMessage(`{}`),
},
@ -720,8 +801,8 @@ func (s) TestXdsLoadbalancingConfigParsing(t *testing.T) {
{
name: "success2",
s: `{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`,
want: &xdsinternal.LBConfig{
ChildPolicy: &xdsinternal.LoadBalancingConfig{
want: &XDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: "round_robin",
Config: json.RawMessage(`{}`),
},
@ -730,7 +811,7 @@ func (s) TestXdsLoadbalancingConfigParsing(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var cfg xdsinternal.LBConfig
var cfg XDSConfig
if err := json.Unmarshal([]byte(tt.s), &cfg); err != nil || !reflect.DeepEqual(&cfg, tt.want) {
t.Errorf("test name: %s, parseFullServiceConfig() = %+v, err: %v, want %+v, <nil>", tt.name, cfg, err, tt.want)
}

View File

@ -18,12 +18,9 @@
package internal
import (
"encoding/json"
"fmt"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/serviceconfig"
)
// Locality is xds.Locality without XXX fields, so it can be used as map
@ -51,88 +48,3 @@ func (lamk Locality) ToProto() *corepb.Locality {
SubZone: lamk.SubZone,
}
}
// LBConfig represents the loadBalancingConfig section of the service config
// for xDS balancers.
type LBConfig struct {
serviceconfig.LoadBalancingConfig
// BalancerName represents the load balancer to use.
BalancerName string
// ChildPolicy represents the load balancing config for the child policy.
ChildPolicy *LoadBalancingConfig
// FallBackPolicy represents the load balancing config for the fallback.
FallBackPolicy *LoadBalancingConfig
}
// UnmarshalJSON parses the JSON-encoded byte slice in data and stores it in l.
// When unmarshalling, we iterate through the childPolicy/fallbackPolicy lists
// and select the first LB policy which has been registered.
func (l *LBConfig) UnmarshalJSON(data []byte) error {
var val map[string]json.RawMessage
if err := json.Unmarshal(data, &val); err != nil {
return err
}
for k, v := range val {
switch k {
case "balancerName":
if err := json.Unmarshal(v, &l.BalancerName); err != nil {
return err
}
case "childPolicy":
var lbcfgs []*LoadBalancingConfig
if err := json.Unmarshal(v, &lbcfgs); err != nil {
return err
}
for _, lbcfg := range lbcfgs {
if balancer.Get(lbcfg.Name) != nil {
l.ChildPolicy = lbcfg
break
}
}
case "fallbackPolicy":
var lbcfgs []*LoadBalancingConfig
if err := json.Unmarshal(v, &lbcfgs); err != nil {
return err
}
for _, lbcfg := range lbcfgs {
if balancer.Get(lbcfg.Name) != nil {
l.FallBackPolicy = lbcfg
break
}
}
}
}
return nil
}
// MarshalJSON returns a JSON encoding of l.
func (l *LBConfig) MarshalJSON() ([]byte, error) {
return nil, nil
}
// LoadBalancingConfig represents a single load balancing config,
// stored in JSON format.
type LoadBalancingConfig struct {
Name string
Config json.RawMessage
}
// MarshalJSON returns a JSON encoding of l.
func (l *LoadBalancingConfig) MarshalJSON() ([]byte, error) {
m := make(map[string]json.RawMessage)
m[l.Name] = l.Config
return json.Marshal(m)
}
// UnmarshalJSON parses the JSON-encoded byte slice in data and stores it in l.
func (l *LoadBalancingConfig) UnmarshalJSON(data []byte) error {
var cfg map[string]json.RawMessage
if err := json.Unmarshal(data, &cfg); err != nil {
return err
}
for name, config := range cfg {
l.Name = name
l.Config = config
}
return nil
}

View File

@ -31,7 +31,7 @@ import (
"google.golang.org/grpc/internal"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
xdsinternal "google.golang.org/grpc/xds/internal"
xdsbalancer "google.golang.org/grpc/xds/internal/balancer"
)
// This is initialized at init time.
@ -118,7 +118,7 @@ func (*fakeBalancer) ResolverError(error) {
panic("unimplemented")
}
// UpdateClientConnState verifies that the received LBConfig matches the
// UpdateClientConnState verifies that the received edsConfig matches the
// provided one, and if not, sends an error on the provided channel.
func (f *fakeBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
gotLBConfig, ok := ccs.BalancerConfig.(*wrappedLBConfig)
@ -127,7 +127,7 @@ func (f *fakeBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
return balancer.ErrBadResolverState
}
var gotCfg, wantCfg xdsinternal.LBConfig
var gotCfg, wantCfg xdsbalancer.XDSConfig
if err := wantCfg.UnmarshalJSON(f.wantLBConfig.lbCfg); err != nil {
f.errCh <- fmt.Errorf("unable to unmarshal balancer config %s into xds config", string(f.wantLBConfig.lbCfg))
return balancer.ErrBadResolverState
@ -136,6 +136,7 @@ func (f *fakeBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
f.errCh <- fmt.Errorf("unable to unmarshal balancer config %s into xds config", string(gotLBConfig.lbCfg))
return balancer.ErrBadResolverState
}
if !reflect.DeepEqual(gotCfg, wantCfg) {
f.errCh <- fmt.Errorf("in fakeBalancer got lbConfig %v, want %v", gotCfg, wantCfg)
return balancer.ErrBadResolverState
@ -171,7 +172,7 @@ func (f *fakeBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.Load
}
// wrappedLBConfig simply wraps the provided LB config with a
// serviceconfig.LoadBalancingConfig interface.
// serviceconfig.loadBalancingConfig interface.
type wrappedLBConfig struct {
serviceconfig.LoadBalancingConfig
lbCfg json.RawMessage