Implementation of the xds_experimental resolver. (#2967)
This resolver doesn't do much at this point, except returning an empty address list and a hard-coded service config which picks the xds balancer with a round_robin child policy. Also moved the xdsConfig struct to the xds/internal package and exported it as LBConfig, so that both the resolver and the balancer packages can make use of this.
This commit is contained in:

committed by
GitHub

parent
3bb34e5324
commit
dc1875474e
1
vet.sh
1
vet.sh
@ -111,6 +111,7 @@ google.golang.org/grpc/balancer.go:SA1019
|
||||
google.golang.org/grpc/balancer/grpclb/grpclb_remote_balancer.go:SA1019
|
||||
google.golang.org/grpc/balancer/roundrobin/roundrobin_test.go:SA1019
|
||||
google.golang.org/grpc/xds/internal/balancer/edsbalancer/balancergroup.go:SA1019
|
||||
google.golang.org/grpc/xds/internal/resolver/xds_resolver.go:SA1019
|
||||
google.golang.org/grpc/xds/internal/balancer/xds.go:SA1019
|
||||
google.golang.org/grpc/xds/internal/balancer/xds_client.go:SA1019
|
||||
google.golang.org/grpc/balancer_conn_wrappers.go:SA1019
|
||||
|
@ -24,9 +24,12 @@ package experimental
|
||||
|
||||
import (
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/resolver"
|
||||
xdsbalancer "google.golang.org/grpc/xds/internal/balancer"
|
||||
xdsresolver "google.golang.org/grpc/xds/internal/resolver"
|
||||
)
|
||||
|
||||
func init() {
|
||||
resolver.Register(xdsresolver.NewBuilder())
|
||||
balancer.Register(xdsbalancer.NewBalancerBuilder())
|
||||
}
|
||||
|
@ -33,6 +33,7 @@ import (
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
xdsinternal "google.golang.org/grpc/xds/internal"
|
||||
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
|
||||
"google.golang.org/grpc/xds/internal/balancer/lrs"
|
||||
cdspb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/cds"
|
||||
@ -89,7 +90,7 @@ func (b *xdsBalancerBuilder) Name() string {
|
||||
}
|
||||
|
||||
func (b *xdsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
|
||||
var cfg xdsConfig
|
||||
var cfg xdsinternal.LBConfig
|
||||
if err := json.Unmarshal(c, &cfg); err != nil {
|
||||
return nil, fmt.Errorf("unable to unmarshal balancer config %s into xds config", string(c))
|
||||
}
|
||||
@ -130,15 +131,15 @@ type xdsBalancer struct {
|
||||
timer *time.Timer
|
||||
noSubConnAlert <-chan struct{}
|
||||
|
||||
client *client // may change when passed a different service config
|
||||
config *xdsConfig // may change when passed a different service config
|
||||
client *client // may change when passed a different service config
|
||||
config *xdsinternal.LBConfig // may change when passed a different service config
|
||||
xdsLB edsBalancerInterface
|
||||
fallbackLB balancer.Balancer
|
||||
fallbackInitData *resolver.State // may change when HandleResolved address is called
|
||||
loadStore lrs.Store
|
||||
}
|
||||
|
||||
func (x *xdsBalancer) startNewXDSClient(u *xdsConfig) {
|
||||
func (x *xdsBalancer) startNewXDSClient(u *xdsinternal.LBConfig) {
|
||||
// If the xdsBalancer is in startup stage, then we need to apply the startup timeout for the first
|
||||
// xdsClient to get a response from the traffic director.
|
||||
if x.startup {
|
||||
@ -237,7 +238,7 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) {
|
||||
}
|
||||
}
|
||||
case *balancer.ClientConnState:
|
||||
cfg, _ := u.BalancerConfig.(*xdsConfig)
|
||||
cfg, _ := u.BalancerConfig.(*xdsinternal.LBConfig)
|
||||
if cfg == nil {
|
||||
// service config parsing failed. should never happen.
|
||||
return
|
||||
@ -497,16 +498,16 @@ func (x *xdsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() {
|
||||
}
|
||||
}
|
||||
|
||||
func (x *xdsBalancer) buildFallBackBalancer(c *xdsConfig) {
|
||||
func (x *xdsBalancer) buildFallBackBalancer(c *xdsinternal.LBConfig) {
|
||||
if c.FallBackPolicy == nil {
|
||||
x.buildFallBackBalancer(&xdsConfig{
|
||||
FallBackPolicy: &loadBalancingConfig{
|
||||
x.buildFallBackBalancer(&xdsinternal.LBConfig{
|
||||
FallBackPolicy: &xdsinternal.LoadBalancingConfig{
|
||||
Name: "round_robin",
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
// builder will always be non-nil, since when parse JSON into xdsConfig, we check whether the specified
|
||||
// builder will always be non-nil, since when parse JSON into xdsinternal.LBConfig, we check whether the specified
|
||||
// balancer is registered or not.
|
||||
builder := balancer.Get(c.FallBackPolicy.Name)
|
||||
|
||||
@ -566,77 +567,3 @@ func createDrainedTimer() *time.Timer {
|
||||
}
|
||||
return timer
|
||||
}
|
||||
|
||||
type xdsConfig struct {
|
||||
serviceconfig.LoadBalancingConfig
|
||||
BalancerName string
|
||||
ChildPolicy *loadBalancingConfig
|
||||
FallBackPolicy *loadBalancingConfig
|
||||
}
|
||||
|
||||
// When unmarshalling json to xdsConfig, we iterate through the childPolicy/fallbackPolicy lists
|
||||
// and select the first LB policy which has been registered to be stored in the returned xdsConfig.
|
||||
func (p *xdsConfig) UnmarshalJSON(data []byte) error {
|
||||
var val map[string]json.RawMessage
|
||||
if err := json.Unmarshal(data, &val); err != nil {
|
||||
return err
|
||||
}
|
||||
for k, v := range val {
|
||||
switch k {
|
||||
case "balancerName":
|
||||
if err := json.Unmarshal(v, &p.BalancerName); err != nil {
|
||||
return err
|
||||
}
|
||||
case "childPolicy":
|
||||
var lbcfgs []*loadBalancingConfig
|
||||
if err := json.Unmarshal(v, &lbcfgs); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, lbcfg := range lbcfgs {
|
||||
if balancer.Get(lbcfg.Name) != nil {
|
||||
p.ChildPolicy = lbcfg
|
||||
break
|
||||
}
|
||||
}
|
||||
case "fallbackPolicy":
|
||||
var lbcfgs []*loadBalancingConfig
|
||||
if err := json.Unmarshal(v, &lbcfgs); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, lbcfg := range lbcfgs {
|
||||
if balancer.Get(lbcfg.Name) != nil {
|
||||
p.FallBackPolicy = lbcfg
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *xdsConfig) MarshalJSON() ([]byte, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type loadBalancingConfig struct {
|
||||
Name string
|
||||
Config json.RawMessage
|
||||
}
|
||||
|
||||
func (l *loadBalancingConfig) MarshalJSON() ([]byte, error) {
|
||||
m := make(map[string]json.RawMessage)
|
||||
m[l.Name] = l.Config
|
||||
return json.Marshal(m)
|
||||
}
|
||||
|
||||
func (l *loadBalancingConfig) UnmarshalJSON(data []byte) error {
|
||||
var cfg map[string]json.RawMessage
|
||||
if err := json.Unmarshal(data, &cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
for name, config := range cfg {
|
||||
l.Name = name
|
||||
l.Config = config
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -33,6 +33,7 @@ import (
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/grpc/xds/internal"
|
||||
xdsinternal "google.golang.org/grpc/xds/internal"
|
||||
basepb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/core/base"
|
||||
lrsgrpc "google.golang.org/grpc/xds/internal/proto/envoy/service/load_stats/v2/lrs"
|
||||
lrspb "google.golang.org/grpc/xds/internal/proto/envoy/service/load_stats/v2/lrs"
|
||||
@ -112,9 +113,9 @@ func (s) TestXdsLoadReporting(t *testing.T) {
|
||||
Nanos: intervalNano,
|
||||
}
|
||||
|
||||
cfg := &xdsConfig{
|
||||
cfg := &xdsinternal.LBConfig{
|
||||
BalancerName: addr,
|
||||
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, // Set this to skip cds.
|
||||
ChildPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA}, // Set this to skip cds.
|
||||
}
|
||||
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg})
|
||||
td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
|
||||
|
@ -31,6 +31,7 @@ import (
|
||||
"google.golang.org/grpc/internal/grpctest"
|
||||
"google.golang.org/grpc/internal/leakcheck"
|
||||
"google.golang.org/grpc/resolver"
|
||||
xdsinternal "google.golang.org/grpc/xds/internal"
|
||||
"google.golang.org/grpc/xds/internal/balancer/lrs"
|
||||
discoverypb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/discovery"
|
||||
edspb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/eds"
|
||||
@ -62,10 +63,10 @@ const (
|
||||
|
||||
var (
|
||||
testBalancerNameFooBar = "foo.bar"
|
||||
testLBConfigFooBar = &xdsConfig{
|
||||
testLBConfigFooBar = &xdsinternal.LBConfig{
|
||||
BalancerName: testBalancerNameFooBar,
|
||||
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
|
||||
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
|
||||
ChildPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
|
||||
FallBackPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
|
||||
}
|
||||
|
||||
specialAddrForBalancerA = resolver.Address{Addr: "this.is.balancer.A"}
|
||||
@ -178,8 +179,8 @@ type scStateChange struct {
|
||||
type fakeEDSBalancer struct {
|
||||
cc balancer.ClientConn
|
||||
edsChan chan *edspb.ClusterLoadAssignment
|
||||
childPolicy chan *loadBalancingConfig
|
||||
fallbackPolicy chan *loadBalancingConfig
|
||||
childPolicy chan *xdsinternal.LoadBalancingConfig
|
||||
fallbackPolicy chan *xdsinternal.LoadBalancingConfig
|
||||
subconnStateChange chan *scStateChange
|
||||
loadStore lrs.Store
|
||||
}
|
||||
@ -199,7 +200,7 @@ func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *edspb.ClusterLoadAssignment
|
||||
}
|
||||
|
||||
func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) {
|
||||
f.childPolicy <- &loadBalancingConfig{
|
||||
f.childPolicy <- &xdsinternal.LoadBalancingConfig{
|
||||
Name: name,
|
||||
Config: config,
|
||||
}
|
||||
@ -209,8 +210,8 @@ func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancer
|
||||
lb := &fakeEDSBalancer{
|
||||
cc: cc,
|
||||
edsChan: make(chan *edspb.ClusterLoadAssignment, 10),
|
||||
childPolicy: make(chan *loadBalancingConfig, 10),
|
||||
fallbackPolicy: make(chan *loadBalancingConfig, 10),
|
||||
childPolicy: make(chan *xdsinternal.LoadBalancingConfig, 10),
|
||||
fallbackPolicy: make(chan *xdsinternal.LoadBalancingConfig, 10),
|
||||
subconnStateChange: make(chan *scStateChange, 10),
|
||||
loadStore: loadStore,
|
||||
}
|
||||
@ -308,10 +309,10 @@ func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) {
|
||||
for i := 0; i < 2; i++ {
|
||||
addr, td, _, cleanup := setupServer(t)
|
||||
cleanups = append(cleanups, cleanup)
|
||||
workingLBConfig := &xdsConfig{
|
||||
workingLBConfig := &xdsinternal.LBConfig{
|
||||
BalancerName: addr,
|
||||
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
|
||||
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
|
||||
ChildPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
|
||||
FallBackPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
|
||||
}
|
||||
lb.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: resolver.State{Addresses: addrs},
|
||||
@ -364,39 +365,39 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
for _, test := range []struct {
|
||||
cfg *xdsConfig
|
||||
cfg *xdsinternal.LBConfig
|
||||
responseToSend *discoverypb.DiscoveryResponse
|
||||
expectedChildPolicy *loadBalancingConfig
|
||||
expectedChildPolicy *xdsinternal.LoadBalancingConfig
|
||||
}{
|
||||
{
|
||||
cfg: &xdsConfig{
|
||||
ChildPolicy: &loadBalancingConfig{
|
||||
cfg: &xdsinternal.LBConfig{
|
||||
ChildPolicy: &xdsinternal.LoadBalancingConfig{
|
||||
Name: fakeBalancerA,
|
||||
Config: json.RawMessage("{}"),
|
||||
},
|
||||
},
|
||||
responseToSend: testEDSRespWithoutEndpoints,
|
||||
expectedChildPolicy: &loadBalancingConfig{
|
||||
expectedChildPolicy: &xdsinternal.LoadBalancingConfig{
|
||||
Name: string(fakeBalancerA),
|
||||
Config: json.RawMessage(`{}`),
|
||||
},
|
||||
},
|
||||
{
|
||||
cfg: &xdsConfig{
|
||||
ChildPolicy: &loadBalancingConfig{
|
||||
cfg: &xdsinternal.LBConfig{
|
||||
ChildPolicy: &xdsinternal.LoadBalancingConfig{
|
||||
Name: fakeBalancerB,
|
||||
Config: json.RawMessage("{}"),
|
||||
},
|
||||
},
|
||||
expectedChildPolicy: &loadBalancingConfig{
|
||||
expectedChildPolicy: &xdsinternal.LoadBalancingConfig{
|
||||
Name: string(fakeBalancerB),
|
||||
Config: json.RawMessage(`{}`),
|
||||
},
|
||||
},
|
||||
{
|
||||
cfg: &xdsConfig{},
|
||||
cfg: &xdsinternal.LBConfig{},
|
||||
responseToSend: testCDSResp,
|
||||
expectedChildPolicy: &loadBalancingConfig{
|
||||
expectedChildPolicy: &xdsinternal.LoadBalancingConfig{
|
||||
Name: "ROUND_ROBIN",
|
||||
},
|
||||
},
|
||||
@ -449,16 +450,16 @@ func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) {
|
||||
|
||||
addr, td, _, cleanup := setupServer(t)
|
||||
|
||||
cfg := xdsConfig{
|
||||
cfg := xdsinternal.LBConfig{
|
||||
BalancerName: addr,
|
||||
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
|
||||
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
|
||||
ChildPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
|
||||
FallBackPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
|
||||
}
|
||||
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: &cfg})
|
||||
|
||||
addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
|
||||
cfg2 := cfg
|
||||
cfg2.FallBackPolicy = &loadBalancingConfig{Name: fakeBalancerB}
|
||||
cfg2.FallBackPolicy = &xdsinternal.LoadBalancingConfig{Name: fakeBalancerB}
|
||||
lb.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: resolver.State{Addresses: addrs},
|
||||
BalancerConfig: &cfg2,
|
||||
@ -490,7 +491,7 @@ func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) {
|
||||
}
|
||||
|
||||
cfg3 := cfg
|
||||
cfg3.FallBackPolicy = &loadBalancingConfig{Name: fakeBalancerA}
|
||||
cfg3.FallBackPolicy = &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA}
|
||||
lb.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: resolver.State{Addresses: addrs},
|
||||
BalancerConfig: &cfg3,
|
||||
@ -524,10 +525,10 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) {
|
||||
|
||||
addr, td, _, cleanup := setupServer(t)
|
||||
defer cleanup()
|
||||
cfg := &xdsConfig{
|
||||
cfg := &xdsinternal.LBConfig{
|
||||
BalancerName: addr,
|
||||
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
|
||||
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
|
||||
ChildPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
|
||||
FallBackPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
|
||||
}
|
||||
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg})
|
||||
|
||||
@ -602,10 +603,10 @@ func (s) TestXdsBalancerFallBackSignalFromEdsBalancer(t *testing.T) {
|
||||
|
||||
addr, td, _, cleanup := setupServer(t)
|
||||
defer cleanup()
|
||||
cfg := &xdsConfig{
|
||||
cfg := &xdsinternal.LBConfig{
|
||||
BalancerName: addr,
|
||||
ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
|
||||
FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
|
||||
ChildPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
|
||||
FallBackPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA},
|
||||
}
|
||||
lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg})
|
||||
|
||||
@ -673,12 +674,12 @@ func (s) TestXdsBalancerConfigParsingSelectingLBPolicy(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("unable to unmarshal balancer config into xds config: %v", err)
|
||||
}
|
||||
xdsCfg := cfg.(*xdsConfig)
|
||||
wantChildPolicy := &loadBalancingConfig{Name: string(fakeBalancerA), Config: json.RawMessage(`{}`)}
|
||||
xdsCfg := cfg.(*xdsinternal.LBConfig)
|
||||
wantChildPolicy := &xdsinternal.LoadBalancingConfig{Name: string(fakeBalancerA), Config: json.RawMessage(`{}`)}
|
||||
if !reflect.DeepEqual(xdsCfg.ChildPolicy, wantChildPolicy) {
|
||||
t.Fatalf("got child policy %v, want %v", xdsCfg.ChildPolicy, wantChildPolicy)
|
||||
}
|
||||
wantFallbackPolicy := &loadBalancingConfig{Name: string(fakeBalancerB), Config: json.RawMessage(`{}`)}
|
||||
wantFallbackPolicy := &xdsinternal.LoadBalancingConfig{Name: string(fakeBalancerB), Config: json.RawMessage(`{}`)}
|
||||
if !reflect.DeepEqual(xdsCfg.FallBackPolicy, wantFallbackPolicy) {
|
||||
t.Fatalf("got fallback policy %v, want %v", xdsCfg.FallBackPolicy, wantFallbackPolicy)
|
||||
}
|
||||
@ -688,18 +689,18 @@ func (s) TestXdsLoadbalancingConfigParsing(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
s string
|
||||
want *xdsConfig
|
||||
want *xdsinternal.LBConfig
|
||||
}{
|
||||
{
|
||||
name: "empty",
|
||||
s: "{}",
|
||||
want: &xdsConfig{},
|
||||
want: &xdsinternal.LBConfig{},
|
||||
},
|
||||
{
|
||||
name: "success1",
|
||||
s: `{"childPolicy":[{"pick_first":{}}]}`,
|
||||
want: &xdsConfig{
|
||||
ChildPolicy: &loadBalancingConfig{
|
||||
want: &xdsinternal.LBConfig{
|
||||
ChildPolicy: &xdsinternal.LoadBalancingConfig{
|
||||
Name: "pick_first",
|
||||
Config: json.RawMessage(`{}`),
|
||||
},
|
||||
@ -708,8 +709,8 @@ func (s) TestXdsLoadbalancingConfigParsing(t *testing.T) {
|
||||
{
|
||||
name: "success2",
|
||||
s: `{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`,
|
||||
want: &xdsConfig{
|
||||
ChildPolicy: &loadBalancingConfig{
|
||||
want: &xdsinternal.LBConfig{
|
||||
ChildPolicy: &xdsinternal.LoadBalancingConfig{
|
||||
Name: "round_robin",
|
||||
Config: json.RawMessage(`{}`),
|
||||
},
|
||||
@ -718,7 +719,7 @@ func (s) TestXdsLoadbalancingConfigParsing(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
var cfg xdsConfig
|
||||
var cfg xdsinternal.LBConfig
|
||||
if err := json.Unmarshal([]byte(tt.s), &cfg); err != nil || !reflect.DeepEqual(&cfg, tt.want) {
|
||||
t.Errorf("test name: %s, parseFullServiceConfig() = %+v, err: %v, want %+v, <nil>", tt.name, cfg, err, tt.want)
|
||||
}
|
||||
|
@ -18,8 +18,11 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
basepb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/core/base"
|
||||
)
|
||||
|
||||
@ -48,3 +51,88 @@ func (lamk Locality) ToProto() *basepb.Locality {
|
||||
SubZone: lamk.SubZone,
|
||||
}
|
||||
}
|
||||
|
||||
// LBConfig represents the loadBalancingConfig section of the service config
|
||||
// for xDS balancers.
|
||||
type LBConfig struct {
|
||||
serviceconfig.LoadBalancingConfig
|
||||
// BalancerName represents the load balancer to use.
|
||||
BalancerName string
|
||||
// ChildPolicy represents the load balancing config for the child policy.
|
||||
ChildPolicy *LoadBalancingConfig
|
||||
// FallBackPolicy represents the load balancing config for the fallback.
|
||||
FallBackPolicy *LoadBalancingConfig
|
||||
}
|
||||
|
||||
// UnmarshalJSON parses the JSON-encoded byte slice in data and stores it in l.
|
||||
// When unmarshalling, we iterate through the childPolicy/fallbackPolicy lists
|
||||
// and select the first LB policy which has been registered.
|
||||
func (l *LBConfig) UnmarshalJSON(data []byte) error {
|
||||
var val map[string]json.RawMessage
|
||||
if err := json.Unmarshal(data, &val); err != nil {
|
||||
return err
|
||||
}
|
||||
for k, v := range val {
|
||||
switch k {
|
||||
case "balancerName":
|
||||
if err := json.Unmarshal(v, &l.BalancerName); err != nil {
|
||||
return err
|
||||
}
|
||||
case "childPolicy":
|
||||
var lbcfgs []*LoadBalancingConfig
|
||||
if err := json.Unmarshal(v, &lbcfgs); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, lbcfg := range lbcfgs {
|
||||
if balancer.Get(lbcfg.Name) != nil {
|
||||
l.ChildPolicy = lbcfg
|
||||
break
|
||||
}
|
||||
}
|
||||
case "fallbackPolicy":
|
||||
var lbcfgs []*LoadBalancingConfig
|
||||
if err := json.Unmarshal(v, &lbcfgs); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, lbcfg := range lbcfgs {
|
||||
if balancer.Get(lbcfg.Name) != nil {
|
||||
l.FallBackPolicy = lbcfg
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// MarshalJSON returns a JSON enconding of l.
|
||||
func (l *LBConfig) MarshalJSON() ([]byte, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// LoadBalancingConfig represents a single load balancing config,
|
||||
// stored in JSON format.
|
||||
type LoadBalancingConfig struct {
|
||||
Name string
|
||||
Config json.RawMessage
|
||||
}
|
||||
|
||||
// MarshalJSON returns a JSON enconding of l.
|
||||
func (l *LoadBalancingConfig) MarshalJSON() ([]byte, error) {
|
||||
m := make(map[string]json.RawMessage)
|
||||
m[l.Name] = l.Config
|
||||
return json.Marshal(m)
|
||||
}
|
||||
|
||||
// UnmarshalJSON parses the JSON-encoded byte slice in data and stores it in l.
|
||||
func (l *LoadBalancingConfig) UnmarshalJSON(data []byte) error {
|
||||
var cfg map[string]json.RawMessage
|
||||
if err := json.Unmarshal(data, &cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
for name, config := range cfg {
|
||||
l.Name = name
|
||||
l.Config = config
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
103
xds/internal/resolver/xds_resolver.go
Normal file
103
xds/internal/resolver/xds_resolver.go
Normal file
@ -0,0 +1,103 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2019 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
// Package resolver implements the xds resolver.
|
||||
//
|
||||
// At this point, the resolver is named xds-experimental, and doesn't do very
|
||||
// much at all, except for returning a hard-coded service config which selects
|
||||
// the xds_experimental balancer.
|
||||
package resolver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
)
|
||||
|
||||
const (
|
||||
// The JSON form of the hard-coded service config which picks the
|
||||
// xds_experimental balancer with round_robin as the child policy.
|
||||
jsonSC = `{
|
||||
"loadBalancingConfig":[
|
||||
{
|
||||
"xds_experimental":{
|
||||
"childPolicy":[
|
||||
{
|
||||
"round_robin": {}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}`
|
||||
// xDS balancer name is xds_experimental while resolver scheme is
|
||||
// xds-experimental since "_" is not a valid character in the URL.
|
||||
xdsScheme = "xds-experimental"
|
||||
)
|
||||
|
||||
var (
|
||||
parseOnce sync.Once
|
||||
parsedSC serviceconfig.Config
|
||||
)
|
||||
|
||||
// NewBuilder creates a new implementation of the resolver.Builder interface
|
||||
// for the xDS resolver.
|
||||
func NewBuilder() resolver.Builder {
|
||||
return &xdsBuilder{}
|
||||
}
|
||||
|
||||
type xdsBuilder struct{}
|
||||
|
||||
// Build helps implement the resolver.Builder interface.
|
||||
func (b *xdsBuilder) Build(t resolver.Target, cc resolver.ClientConn, o resolver.BuildOption) (resolver.Resolver, error) {
|
||||
parseOnce.Do(func() {
|
||||
// The xds balancer must have been registered at this point for the service
|
||||
// config to be parsed properly.
|
||||
psc, err := internal.ParseServiceConfig(jsonSC)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("service config %s parsing failed: %v", jsonSC, err))
|
||||
}
|
||||
|
||||
var ok bool
|
||||
if parsedSC, ok = psc.(*grpc.ServiceConfig); !ok {
|
||||
panic(fmt.Sprintf("service config type is [%T], want [grpc.ServiceConfig]", psc))
|
||||
}
|
||||
})
|
||||
|
||||
// We return a resolver which bacically does nothing. The hard-coded service
|
||||
// config returned here picks the xds balancer.
|
||||
cc.UpdateState(resolver.State{ServiceConfig: parsedSC})
|
||||
return &xdsResolver{}, nil
|
||||
}
|
||||
|
||||
// Name helps implement the resolver.Builder interface.
|
||||
func (*xdsBuilder) Scheme() string {
|
||||
return xdsScheme
|
||||
}
|
||||
|
||||
type xdsResolver struct{}
|
||||
|
||||
// ResolveNow is a no-op at this point.
|
||||
func (*xdsResolver) ResolveNow(o resolver.ResolveNowOption) {}
|
||||
|
||||
// Close is a no-op at this point.
|
||||
func (*xdsResolver) Close() {}
|
196
xds/internal/resolver/xds_resolver_test.go
Normal file
196
xds/internal/resolver/xds_resolver_test.go
Normal file
@ -0,0 +1,196 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2019 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package resolver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
xdsinternal "google.golang.org/grpc/xds/internal"
|
||||
)
|
||||
|
||||
// This is initialized at init time.
|
||||
var fbb *fakeBalancerBuilder
|
||||
|
||||
// We register a fake balancer builder and the actual xds_resolver here. We use
|
||||
// the fake balancer builder to verify the service config pushed by the
|
||||
// resolver.
|
||||
func init() {
|
||||
resolver.Register(NewBuilder())
|
||||
fbb = &fakeBalancerBuilder{
|
||||
wantLBConfig: &wrappedLBConfig{lbCfg: json.RawMessage(`{
|
||||
"childPolicy":[
|
||||
{
|
||||
"round_robin": {}
|
||||
}
|
||||
]
|
||||
}`)},
|
||||
errCh: make(chan error),
|
||||
}
|
||||
balancer.Register(fbb)
|
||||
}
|
||||
|
||||
// testClientConn is a fake implemetation of resolver.ClientConn. All is does
|
||||
// is to store the state received from the resolver locally and close the
|
||||
// provided done channel.
|
||||
type testClientConn struct {
|
||||
done chan struct{}
|
||||
gotState resolver.State
|
||||
}
|
||||
|
||||
func (t *testClientConn) UpdateState(s resolver.State) {
|
||||
t.gotState = s
|
||||
close(t.done)
|
||||
}
|
||||
|
||||
func (*testClientConn) NewAddress(addresses []resolver.Address) { panic("unimplemented") }
|
||||
func (*testClientConn) NewServiceConfig(serviceConfig string) { panic("unimplemented") }
|
||||
|
||||
// TestXDSRsolverSchemeAndAddresses creates a new xds resolver, verifies that
|
||||
// it returns an empty address list and the appropriate xds-experimental
|
||||
// scheme.
|
||||
func TestXDSRsolverSchemeAndAddresses(t *testing.T) {
|
||||
b := NewBuilder()
|
||||
wantScheme := "xds-experimental"
|
||||
if b.Scheme() != wantScheme {
|
||||
t.Fatalf("got scheme %s, want %s", b.Scheme(), wantScheme)
|
||||
}
|
||||
|
||||
tcc := &testClientConn{done: make(chan struct{})}
|
||||
r, err := b.Build(resolver.Target{}, tcc, resolver.BuildOption{})
|
||||
if err != nil {
|
||||
t.Fatalf("xdsBuilder.Build() failed with error: %v", err)
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
<-tcc.done
|
||||
if len(tcc.gotState.Addresses) != 0 {
|
||||
t.Fatalf("got address list from resolver %v, want empty list", tcc.gotState.Addresses)
|
||||
}
|
||||
}
|
||||
|
||||
// fakeBalancer is used to verify that the xds_resolver returns the expected
|
||||
// serice config.
|
||||
type fakeBalancer struct {
|
||||
wantLBConfig *wrappedLBConfig
|
||||
errCh chan error
|
||||
}
|
||||
|
||||
func (*fakeBalancer) HandleSubConnStateChange(_ balancer.SubConn, _ connectivity.State) {
|
||||
panic("unimplemented")
|
||||
}
|
||||
func (*fakeBalancer) HandleResolvedAddrs(_ []resolver.Address, _ error) {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// UpdateClientConnState verifies that the received LBConfig matches the
|
||||
// provided one, and if not, sends an error on the provided channel.
|
||||
func (f *fakeBalancer) UpdateClientConnState(ccs balancer.ClientConnState) {
|
||||
gotLBConfig, ok := ccs.BalancerConfig.(*wrappedLBConfig)
|
||||
if !ok {
|
||||
f.errCh <- fmt.Errorf("in fakeBalancer got lbConfig of type %T, want %T", ccs.BalancerConfig, &wrappedLBConfig{})
|
||||
return
|
||||
}
|
||||
|
||||
var gotCfg, wantCfg xdsinternal.LBConfig
|
||||
if err := wantCfg.UnmarshalJSON(f.wantLBConfig.lbCfg); err != nil {
|
||||
f.errCh <- fmt.Errorf("unable to unmarshal balancer config %s into xds config", string(f.wantLBConfig.lbCfg))
|
||||
return
|
||||
}
|
||||
if err := gotCfg.UnmarshalJSON(gotLBConfig.lbCfg); err != nil {
|
||||
f.errCh <- fmt.Errorf("unable to unmarshal balancer config %s into xds config", string(gotLBConfig.lbCfg))
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(gotCfg, wantCfg) {
|
||||
f.errCh <- fmt.Errorf("in fakeBalancer got lbConfig %v, want %v", gotCfg, wantCfg)
|
||||
return
|
||||
}
|
||||
|
||||
f.errCh <- nil
|
||||
}
|
||||
|
||||
func (*fakeBalancer) UpdateSubConnState(_ balancer.SubConn, _ balancer.SubConnState) {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
func (*fakeBalancer) Close() {}
|
||||
|
||||
// fakeBalancerBuilder builds a fake balancer and also provides a ParseConfig
|
||||
// method (which doesn't really the parse config, but just stores it as is).
|
||||
type fakeBalancerBuilder struct {
|
||||
wantLBConfig *wrappedLBConfig
|
||||
errCh chan error
|
||||
}
|
||||
|
||||
func (f *fakeBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
|
||||
return &fakeBalancer{f.wantLBConfig, f.errCh}
|
||||
}
|
||||
|
||||
func (f *fakeBalancerBuilder) Name() string {
|
||||
return "xds_experimental"
|
||||
}
|
||||
|
||||
func (f *fakeBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
|
||||
return &wrappedLBConfig{lbCfg: c}, nil
|
||||
}
|
||||
|
||||
// wrappedLBConfig simply wraps the provided LB config with a
|
||||
// serviceconfig.LoadBalancingConfig interface.
|
||||
type wrappedLBConfig struct {
|
||||
serviceconfig.LoadBalancingConfig
|
||||
lbCfg json.RawMessage
|
||||
}
|
||||
|
||||
// TestXDSRsolverServiceConfig verifies that the xds_resolver returns the
|
||||
// expected service config.
|
||||
//
|
||||
// The following sequence of events happen in this test:
|
||||
// * The xds_experimental balancer (fake) and resolver builders are initialized
|
||||
// at init time.
|
||||
// * We dial a dummy address here with the xds-experimental scheme. This should
|
||||
// pick the xds_resolver, which should return the hard-coded service config,
|
||||
// which should reach the fake balancer that we registered (because the
|
||||
// service config asks for the xds balancer).
|
||||
// * In the fake balancer, we verify that we receive the expected LB config.
|
||||
func TestXDSRsolverServiceConfig(t *testing.T) {
|
||||
xdsAddr := fmt.Sprintf("%s:///dummy", xdsScheme)
|
||||
cc, err := grpc.Dial(xdsAddr, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
t.Fatalf("grpc.Dial(%s) failed with error: %v", xdsAddr, err)
|
||||
}
|
||||
defer cc.Close()
|
||||
|
||||
timer := time.NewTimer(5 * time.Second)
|
||||
select {
|
||||
case <-timer.C:
|
||||
t.Fatal("timed out waiting for service config to reach balancer")
|
||||
case err := <-fbb.errCh:
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user