diff --git a/vet.sh b/vet.sh index 661e1e1d..2bdfbc8b 100755 --- a/vet.sh +++ b/vet.sh @@ -111,6 +111,7 @@ google.golang.org/grpc/balancer.go:SA1019 google.golang.org/grpc/balancer/grpclb/grpclb_remote_balancer.go:SA1019 google.golang.org/grpc/balancer/roundrobin/roundrobin_test.go:SA1019 google.golang.org/grpc/xds/internal/balancer/edsbalancer/balancergroup.go:SA1019 +google.golang.org/grpc/xds/internal/resolver/xds_resolver.go:SA1019 google.golang.org/grpc/xds/internal/balancer/xds.go:SA1019 google.golang.org/grpc/xds/internal/balancer/xds_client.go:SA1019 google.golang.org/grpc/balancer_conn_wrappers.go:SA1019 diff --git a/xds/experimental/xds_experimental.go b/xds/experimental/xds_experimental.go index 7477ea3d..ff722ad5 100644 --- a/xds/experimental/xds_experimental.go +++ b/xds/experimental/xds_experimental.go @@ -24,9 +24,12 @@ package experimental import ( "google.golang.org/grpc/balancer" + "google.golang.org/grpc/resolver" xdsbalancer "google.golang.org/grpc/xds/internal/balancer" + xdsresolver "google.golang.org/grpc/xds/internal/resolver" ) func init() { + resolver.Register(xdsresolver.NewBuilder()) balancer.Register(xdsbalancer.NewBalancerBuilder()) } diff --git a/xds/internal/balancer/xds.go b/xds/internal/balancer/xds.go index 4da30ae0..3f80d411 100644 --- a/xds/internal/balancer/xds.go +++ b/xds/internal/balancer/xds.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/grpclog" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" + xdsinternal "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/balancer/edsbalancer" "google.golang.org/grpc/xds/internal/balancer/lrs" cdspb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/cds" @@ -89,7 +90,7 @@ func (b *xdsBalancerBuilder) Name() string { } func (b *xdsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { - var cfg xdsConfig + var cfg xdsinternal.LBConfig if err := json.Unmarshal(c, &cfg); err != nil { return nil, fmt.Errorf("unable to unmarshal balancer config %s into xds config", string(c)) } @@ -130,15 +131,15 @@ type xdsBalancer struct { timer *time.Timer noSubConnAlert <-chan struct{} - client *client // may change when passed a different service config - config *xdsConfig // may change when passed a different service config + client *client // may change when passed a different service config + config *xdsinternal.LBConfig // may change when passed a different service config xdsLB edsBalancerInterface fallbackLB balancer.Balancer fallbackInitData *resolver.State // may change when HandleResolved address is called loadStore lrs.Store } -func (x *xdsBalancer) startNewXDSClient(u *xdsConfig) { +func (x *xdsBalancer) startNewXDSClient(u *xdsinternal.LBConfig) { // If the xdsBalancer is in startup stage, then we need to apply the startup timeout for the first // xdsClient to get a response from the traffic director. if x.startup { @@ -237,7 +238,7 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { } } case *balancer.ClientConnState: - cfg, _ := u.BalancerConfig.(*xdsConfig) + cfg, _ := u.BalancerConfig.(*xdsinternal.LBConfig) if cfg == nil { // service config parsing failed. should never happen. return @@ -497,16 +498,16 @@ func (x *xdsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() { } } -func (x *xdsBalancer) buildFallBackBalancer(c *xdsConfig) { +func (x *xdsBalancer) buildFallBackBalancer(c *xdsinternal.LBConfig) { if c.FallBackPolicy == nil { - x.buildFallBackBalancer(&xdsConfig{ - FallBackPolicy: &loadBalancingConfig{ + x.buildFallBackBalancer(&xdsinternal.LBConfig{ + FallBackPolicy: &xdsinternal.LoadBalancingConfig{ Name: "round_robin", }, }) return } - // builder will always be non-nil, since when parse JSON into xdsConfig, we check whether the specified + // builder will always be non-nil, since when parse JSON into xdsinternal.LBConfig, we check whether the specified // balancer is registered or not. builder := balancer.Get(c.FallBackPolicy.Name) @@ -566,77 +567,3 @@ func createDrainedTimer() *time.Timer { } return timer } - -type xdsConfig struct { - serviceconfig.LoadBalancingConfig - BalancerName string - ChildPolicy *loadBalancingConfig - FallBackPolicy *loadBalancingConfig -} - -// When unmarshalling json to xdsConfig, we iterate through the childPolicy/fallbackPolicy lists -// and select the first LB policy which has been registered to be stored in the returned xdsConfig. -func (p *xdsConfig) UnmarshalJSON(data []byte) error { - var val map[string]json.RawMessage - if err := json.Unmarshal(data, &val); err != nil { - return err - } - for k, v := range val { - switch k { - case "balancerName": - if err := json.Unmarshal(v, &p.BalancerName); err != nil { - return err - } - case "childPolicy": - var lbcfgs []*loadBalancingConfig - if err := json.Unmarshal(v, &lbcfgs); err != nil { - return err - } - for _, lbcfg := range lbcfgs { - if balancer.Get(lbcfg.Name) != nil { - p.ChildPolicy = lbcfg - break - } - } - case "fallbackPolicy": - var lbcfgs []*loadBalancingConfig - if err := json.Unmarshal(v, &lbcfgs); err != nil { - return err - } - for _, lbcfg := range lbcfgs { - if balancer.Get(lbcfg.Name) != nil { - p.FallBackPolicy = lbcfg - break - } - } - } - } - return nil -} - -func (p *xdsConfig) MarshalJSON() ([]byte, error) { - return nil, nil -} - -type loadBalancingConfig struct { - Name string - Config json.RawMessage -} - -func (l *loadBalancingConfig) MarshalJSON() ([]byte, error) { - m := make(map[string]json.RawMessage) - m[l.Name] = l.Config - return json.Marshal(m) -} - -func (l *loadBalancingConfig) UnmarshalJSON(data []byte) error { - var cfg map[string]json.RawMessage - if err := json.Unmarshal(data, &cfg); err != nil { - return err - } - for name, config := range cfg { - l.Name = name - l.Config = config - } - return nil -} diff --git a/xds/internal/balancer/xds_lrs_test.go b/xds/internal/balancer/xds_lrs_test.go index 6fe85276..77db3708 100644 --- a/xds/internal/balancer/xds_lrs_test.go +++ b/xds/internal/balancer/xds_lrs_test.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/resolver" "google.golang.org/grpc/status" "google.golang.org/grpc/xds/internal" + xdsinternal "google.golang.org/grpc/xds/internal" basepb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/core/base" lrsgrpc "google.golang.org/grpc/xds/internal/proto/envoy/service/load_stats/v2/lrs" lrspb "google.golang.org/grpc/xds/internal/proto/envoy/service/load_stats/v2/lrs" @@ -112,9 +113,9 @@ func (s) TestXdsLoadReporting(t *testing.T) { Nanos: intervalNano, } - cfg := &xdsConfig{ + cfg := &xdsinternal.LBConfig{ BalancerName: addr, - ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, // Set this to skip cds. + ChildPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA}, // Set this to skip cds. } lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg}) td.sendResp(&response{resp: testEDSRespWithoutEndpoints}) diff --git a/xds/internal/balancer/xds_test.go b/xds/internal/balancer/xds_test.go index 85bd56a1..5b344aac 100644 --- a/xds/internal/balancer/xds_test.go +++ b/xds/internal/balancer/xds_test.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/leakcheck" "google.golang.org/grpc/resolver" + xdsinternal "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/balancer/lrs" discoverypb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/discovery" edspb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/eds" @@ -62,10 +63,10 @@ const ( var ( testBalancerNameFooBar = "foo.bar" - testLBConfigFooBar = &xdsConfig{ + testLBConfigFooBar = &xdsinternal.LBConfig{ BalancerName: testBalancerNameFooBar, - ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, - FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, + ChildPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA}, + FallBackPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA}, } specialAddrForBalancerA = resolver.Address{Addr: "this.is.balancer.A"} @@ -178,8 +179,8 @@ type scStateChange struct { type fakeEDSBalancer struct { cc balancer.ClientConn edsChan chan *edspb.ClusterLoadAssignment - childPolicy chan *loadBalancingConfig - fallbackPolicy chan *loadBalancingConfig + childPolicy chan *xdsinternal.LoadBalancingConfig + fallbackPolicy chan *xdsinternal.LoadBalancingConfig subconnStateChange chan *scStateChange loadStore lrs.Store } @@ -199,7 +200,7 @@ func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *edspb.ClusterLoadAssignment } func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) { - f.childPolicy <- &loadBalancingConfig{ + f.childPolicy <- &xdsinternal.LoadBalancingConfig{ Name: name, Config: config, } @@ -209,8 +210,8 @@ func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancer lb := &fakeEDSBalancer{ cc: cc, edsChan: make(chan *edspb.ClusterLoadAssignment, 10), - childPolicy: make(chan *loadBalancingConfig, 10), - fallbackPolicy: make(chan *loadBalancingConfig, 10), + childPolicy: make(chan *xdsinternal.LoadBalancingConfig, 10), + fallbackPolicy: make(chan *xdsinternal.LoadBalancingConfig, 10), subconnStateChange: make(chan *scStateChange, 10), loadStore: loadStore, } @@ -308,10 +309,10 @@ func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) { for i := 0; i < 2; i++ { addr, td, _, cleanup := setupServer(t) cleanups = append(cleanups, cleanup) - workingLBConfig := &xdsConfig{ + workingLBConfig := &xdsinternal.LBConfig{ BalancerName: addr, - ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, - FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, + ChildPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA}, + FallBackPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA}, } lb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: addrs}, @@ -364,39 +365,39 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) { } }() for _, test := range []struct { - cfg *xdsConfig + cfg *xdsinternal.LBConfig responseToSend *discoverypb.DiscoveryResponse - expectedChildPolicy *loadBalancingConfig + expectedChildPolicy *xdsinternal.LoadBalancingConfig }{ { - cfg: &xdsConfig{ - ChildPolicy: &loadBalancingConfig{ + cfg: &xdsinternal.LBConfig{ + ChildPolicy: &xdsinternal.LoadBalancingConfig{ Name: fakeBalancerA, Config: json.RawMessage("{}"), }, }, responseToSend: testEDSRespWithoutEndpoints, - expectedChildPolicy: &loadBalancingConfig{ + expectedChildPolicy: &xdsinternal.LoadBalancingConfig{ Name: string(fakeBalancerA), Config: json.RawMessage(`{}`), }, }, { - cfg: &xdsConfig{ - ChildPolicy: &loadBalancingConfig{ + cfg: &xdsinternal.LBConfig{ + ChildPolicy: &xdsinternal.LoadBalancingConfig{ Name: fakeBalancerB, Config: json.RawMessage("{}"), }, }, - expectedChildPolicy: &loadBalancingConfig{ + expectedChildPolicy: &xdsinternal.LoadBalancingConfig{ Name: string(fakeBalancerB), Config: json.RawMessage(`{}`), }, }, { - cfg: &xdsConfig{}, + cfg: &xdsinternal.LBConfig{}, responseToSend: testCDSResp, - expectedChildPolicy: &loadBalancingConfig{ + expectedChildPolicy: &xdsinternal.LoadBalancingConfig{ Name: "ROUND_ROBIN", }, }, @@ -449,16 +450,16 @@ func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) { addr, td, _, cleanup := setupServer(t) - cfg := xdsConfig{ + cfg := xdsinternal.LBConfig{ BalancerName: addr, - ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, - FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, + ChildPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA}, + FallBackPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA}, } lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: &cfg}) addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} cfg2 := cfg - cfg2.FallBackPolicy = &loadBalancingConfig{Name: fakeBalancerB} + cfg2.FallBackPolicy = &xdsinternal.LoadBalancingConfig{Name: fakeBalancerB} lb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: addrs}, BalancerConfig: &cfg2, @@ -490,7 +491,7 @@ func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) { } cfg3 := cfg - cfg3.FallBackPolicy = &loadBalancingConfig{Name: fakeBalancerA} + cfg3.FallBackPolicy = &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA} lb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: addrs}, BalancerConfig: &cfg3, @@ -524,10 +525,10 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) { addr, td, _, cleanup := setupServer(t) defer cleanup() - cfg := &xdsConfig{ + cfg := &xdsinternal.LBConfig{ BalancerName: addr, - ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, - FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, + ChildPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA}, + FallBackPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA}, } lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg}) @@ -602,10 +603,10 @@ func (s) TestXdsBalancerFallBackSignalFromEdsBalancer(t *testing.T) { addr, td, _, cleanup := setupServer(t) defer cleanup() - cfg := &xdsConfig{ + cfg := &xdsinternal.LBConfig{ BalancerName: addr, - ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, - FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA}, + ChildPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA}, + FallBackPolicy: &xdsinternal.LoadBalancingConfig{Name: fakeBalancerA}, } lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg}) @@ -673,12 +674,12 @@ func (s) TestXdsBalancerConfigParsingSelectingLBPolicy(t *testing.T) { if err != nil { t.Fatalf("unable to unmarshal balancer config into xds config: %v", err) } - xdsCfg := cfg.(*xdsConfig) - wantChildPolicy := &loadBalancingConfig{Name: string(fakeBalancerA), Config: json.RawMessage(`{}`)} + xdsCfg := cfg.(*xdsinternal.LBConfig) + wantChildPolicy := &xdsinternal.LoadBalancingConfig{Name: string(fakeBalancerA), Config: json.RawMessage(`{}`)} if !reflect.DeepEqual(xdsCfg.ChildPolicy, wantChildPolicy) { t.Fatalf("got child policy %v, want %v", xdsCfg.ChildPolicy, wantChildPolicy) } - wantFallbackPolicy := &loadBalancingConfig{Name: string(fakeBalancerB), Config: json.RawMessage(`{}`)} + wantFallbackPolicy := &xdsinternal.LoadBalancingConfig{Name: string(fakeBalancerB), Config: json.RawMessage(`{}`)} if !reflect.DeepEqual(xdsCfg.FallBackPolicy, wantFallbackPolicy) { t.Fatalf("got fallback policy %v, want %v", xdsCfg.FallBackPolicy, wantFallbackPolicy) } @@ -688,18 +689,18 @@ func (s) TestXdsLoadbalancingConfigParsing(t *testing.T) { tests := []struct { name string s string - want *xdsConfig + want *xdsinternal.LBConfig }{ { name: "empty", s: "{}", - want: &xdsConfig{}, + want: &xdsinternal.LBConfig{}, }, { name: "success1", s: `{"childPolicy":[{"pick_first":{}}]}`, - want: &xdsConfig{ - ChildPolicy: &loadBalancingConfig{ + want: &xdsinternal.LBConfig{ + ChildPolicy: &xdsinternal.LoadBalancingConfig{ Name: "pick_first", Config: json.RawMessage(`{}`), }, @@ -708,8 +709,8 @@ func (s) TestXdsLoadbalancingConfigParsing(t *testing.T) { { name: "success2", s: `{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`, - want: &xdsConfig{ - ChildPolicy: &loadBalancingConfig{ + want: &xdsinternal.LBConfig{ + ChildPolicy: &xdsinternal.LoadBalancingConfig{ Name: "round_robin", Config: json.RawMessage(`{}`), }, @@ -718,7 +719,7 @@ func (s) TestXdsLoadbalancingConfigParsing(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var cfg xdsConfig + var cfg xdsinternal.LBConfig if err := json.Unmarshal([]byte(tt.s), &cfg); err != nil || !reflect.DeepEqual(&cfg, tt.want) { t.Errorf("test name: %s, parseFullServiceConfig() = %+v, err: %v, want %+v, ", tt.name, cfg, err, tt.want) } diff --git a/xds/internal/internal.go b/xds/internal/internal.go index 7403e3f2..85717fd0 100644 --- a/xds/internal/internal.go +++ b/xds/internal/internal.go @@ -18,8 +18,11 @@ package internal import ( + "encoding/json" "fmt" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/serviceconfig" basepb "google.golang.org/grpc/xds/internal/proto/envoy/api/v2/core/base" ) @@ -48,3 +51,88 @@ func (lamk Locality) ToProto() *basepb.Locality { SubZone: lamk.SubZone, } } + +// LBConfig represents the loadBalancingConfig section of the service config +// for xDS balancers. +type LBConfig struct { + serviceconfig.LoadBalancingConfig + // BalancerName represents the load balancer to use. + BalancerName string + // ChildPolicy represents the load balancing config for the child policy. + ChildPolicy *LoadBalancingConfig + // FallBackPolicy represents the load balancing config for the fallback. + FallBackPolicy *LoadBalancingConfig +} + +// UnmarshalJSON parses the JSON-encoded byte slice in data and stores it in l. +// When unmarshalling, we iterate through the childPolicy/fallbackPolicy lists +// and select the first LB policy which has been registered. +func (l *LBConfig) UnmarshalJSON(data []byte) error { + var val map[string]json.RawMessage + if err := json.Unmarshal(data, &val); err != nil { + return err + } + for k, v := range val { + switch k { + case "balancerName": + if err := json.Unmarshal(v, &l.BalancerName); err != nil { + return err + } + case "childPolicy": + var lbcfgs []*LoadBalancingConfig + if err := json.Unmarshal(v, &lbcfgs); err != nil { + return err + } + for _, lbcfg := range lbcfgs { + if balancer.Get(lbcfg.Name) != nil { + l.ChildPolicy = lbcfg + break + } + } + case "fallbackPolicy": + var lbcfgs []*LoadBalancingConfig + if err := json.Unmarshal(v, &lbcfgs); err != nil { + return err + } + for _, lbcfg := range lbcfgs { + if balancer.Get(lbcfg.Name) != nil { + l.FallBackPolicy = lbcfg + break + } + } + } + } + return nil +} + +// MarshalJSON returns a JSON enconding of l. +func (l *LBConfig) MarshalJSON() ([]byte, error) { + return nil, nil +} + +// LoadBalancingConfig represents a single load balancing config, +// stored in JSON format. +type LoadBalancingConfig struct { + Name string + Config json.RawMessage +} + +// MarshalJSON returns a JSON enconding of l. +func (l *LoadBalancingConfig) MarshalJSON() ([]byte, error) { + m := make(map[string]json.RawMessage) + m[l.Name] = l.Config + return json.Marshal(m) +} + +// UnmarshalJSON parses the JSON-encoded byte slice in data and stores it in l. +func (l *LoadBalancingConfig) UnmarshalJSON(data []byte) error { + var cfg map[string]json.RawMessage + if err := json.Unmarshal(data, &cfg); err != nil { + return err + } + for name, config := range cfg { + l.Name = name + l.Config = config + } + return nil +} diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go new file mode 100644 index 00000000..a2e87738 --- /dev/null +++ b/xds/internal/resolver/xds_resolver.go @@ -0,0 +1,103 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package resolver implements the xds resolver. +// +// At this point, the resolver is named xds-experimental, and doesn't do very +// much at all, except for returning a hard-coded service config which selects +// the xds_experimental balancer. +package resolver + +import ( + "fmt" + "sync" + + "google.golang.org/grpc" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" +) + +const ( + // The JSON form of the hard-coded service config which picks the + // xds_experimental balancer with round_robin as the child policy. + jsonSC = `{ + "loadBalancingConfig":[ + { + "xds_experimental":{ + "childPolicy":[ + { + "round_robin": {} + } + ] + } + } + ] + }` + // xDS balancer name is xds_experimental while resolver scheme is + // xds-experimental since "_" is not a valid character in the URL. + xdsScheme = "xds-experimental" +) + +var ( + parseOnce sync.Once + parsedSC serviceconfig.Config +) + +// NewBuilder creates a new implementation of the resolver.Builder interface +// for the xDS resolver. +func NewBuilder() resolver.Builder { + return &xdsBuilder{} +} + +type xdsBuilder struct{} + +// Build helps implement the resolver.Builder interface. +func (b *xdsBuilder) Build(t resolver.Target, cc resolver.ClientConn, o resolver.BuildOption) (resolver.Resolver, error) { + parseOnce.Do(func() { + // The xds balancer must have been registered at this point for the service + // config to be parsed properly. + psc, err := internal.ParseServiceConfig(jsonSC) + if err != nil { + panic(fmt.Sprintf("service config %s parsing failed: %v", jsonSC, err)) + } + + var ok bool + if parsedSC, ok = psc.(*grpc.ServiceConfig); !ok { + panic(fmt.Sprintf("service config type is [%T], want [grpc.ServiceConfig]", psc)) + } + }) + + // We return a resolver which bacically does nothing. The hard-coded service + // config returned here picks the xds balancer. + cc.UpdateState(resolver.State{ServiceConfig: parsedSC}) + return &xdsResolver{}, nil +} + +// Name helps implement the resolver.Builder interface. +func (*xdsBuilder) Scheme() string { + return xdsScheme +} + +type xdsResolver struct{} + +// ResolveNow is a no-op at this point. +func (*xdsResolver) ResolveNow(o resolver.ResolveNowOption) {} + +// Close is a no-op at this point. +func (*xdsResolver) Close() {} diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go new file mode 100644 index 00000000..bfcbfa14 --- /dev/null +++ b/xds/internal/resolver/xds_resolver_test.go @@ -0,0 +1,196 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package resolver + +import ( + "encoding/json" + "fmt" + "reflect" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" + xdsinternal "google.golang.org/grpc/xds/internal" +) + +// This is initialized at init time. +var fbb *fakeBalancerBuilder + +// We register a fake balancer builder and the actual xds_resolver here. We use +// the fake balancer builder to verify the service config pushed by the +// resolver. +func init() { + resolver.Register(NewBuilder()) + fbb = &fakeBalancerBuilder{ + wantLBConfig: &wrappedLBConfig{lbCfg: json.RawMessage(`{ + "childPolicy":[ + { + "round_robin": {} + } + ] + }`)}, + errCh: make(chan error), + } + balancer.Register(fbb) +} + +// testClientConn is a fake implemetation of resolver.ClientConn. All is does +// is to store the state received from the resolver locally and close the +// provided done channel. +type testClientConn struct { + done chan struct{} + gotState resolver.State +} + +func (t *testClientConn) UpdateState(s resolver.State) { + t.gotState = s + close(t.done) +} + +func (*testClientConn) NewAddress(addresses []resolver.Address) { panic("unimplemented") } +func (*testClientConn) NewServiceConfig(serviceConfig string) { panic("unimplemented") } + +// TestXDSRsolverSchemeAndAddresses creates a new xds resolver, verifies that +// it returns an empty address list and the appropriate xds-experimental +// scheme. +func TestXDSRsolverSchemeAndAddresses(t *testing.T) { + b := NewBuilder() + wantScheme := "xds-experimental" + if b.Scheme() != wantScheme { + t.Fatalf("got scheme %s, want %s", b.Scheme(), wantScheme) + } + + tcc := &testClientConn{done: make(chan struct{})} + r, err := b.Build(resolver.Target{}, tcc, resolver.BuildOption{}) + if err != nil { + t.Fatalf("xdsBuilder.Build() failed with error: %v", err) + } + defer r.Close() + + <-tcc.done + if len(tcc.gotState.Addresses) != 0 { + t.Fatalf("got address list from resolver %v, want empty list", tcc.gotState.Addresses) + } +} + +// fakeBalancer is used to verify that the xds_resolver returns the expected +// serice config. +type fakeBalancer struct { + wantLBConfig *wrappedLBConfig + errCh chan error +} + +func (*fakeBalancer) HandleSubConnStateChange(_ balancer.SubConn, _ connectivity.State) { + panic("unimplemented") +} +func (*fakeBalancer) HandleResolvedAddrs(_ []resolver.Address, _ error) { + panic("unimplemented") +} + +// UpdateClientConnState verifies that the received LBConfig matches the +// provided one, and if not, sends an error on the provided channel. +func (f *fakeBalancer) UpdateClientConnState(ccs balancer.ClientConnState) { + gotLBConfig, ok := ccs.BalancerConfig.(*wrappedLBConfig) + if !ok { + f.errCh <- fmt.Errorf("in fakeBalancer got lbConfig of type %T, want %T", ccs.BalancerConfig, &wrappedLBConfig{}) + return + } + + var gotCfg, wantCfg xdsinternal.LBConfig + if err := wantCfg.UnmarshalJSON(f.wantLBConfig.lbCfg); err != nil { + f.errCh <- fmt.Errorf("unable to unmarshal balancer config %s into xds config", string(f.wantLBConfig.lbCfg)) + return + } + if err := gotCfg.UnmarshalJSON(gotLBConfig.lbCfg); err != nil { + f.errCh <- fmt.Errorf("unable to unmarshal balancer config %s into xds config", string(gotLBConfig.lbCfg)) + return + } + if !reflect.DeepEqual(gotCfg, wantCfg) { + f.errCh <- fmt.Errorf("in fakeBalancer got lbConfig %v, want %v", gotCfg, wantCfg) + return + } + + f.errCh <- nil +} + +func (*fakeBalancer) UpdateSubConnState(_ balancer.SubConn, _ balancer.SubConnState) { + panic("unimplemented") +} + +func (*fakeBalancer) Close() {} + +// fakeBalancerBuilder builds a fake balancer and also provides a ParseConfig +// method (which doesn't really the parse config, but just stores it as is). +type fakeBalancerBuilder struct { + wantLBConfig *wrappedLBConfig + errCh chan error +} + +func (f *fakeBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + return &fakeBalancer{f.wantLBConfig, f.errCh} +} + +func (f *fakeBalancerBuilder) Name() string { + return "xds_experimental" +} + +func (f *fakeBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + return &wrappedLBConfig{lbCfg: c}, nil +} + +// wrappedLBConfig simply wraps the provided LB config with a +// serviceconfig.LoadBalancingConfig interface. +type wrappedLBConfig struct { + serviceconfig.LoadBalancingConfig + lbCfg json.RawMessage +} + +// TestXDSRsolverServiceConfig verifies that the xds_resolver returns the +// expected service config. +// +// The following sequence of events happen in this test: +// * The xds_experimental balancer (fake) and resolver builders are initialized +// at init time. +// * We dial a dummy address here with the xds-experimental scheme. This should +// pick the xds_resolver, which should return the hard-coded service config, +// which should reach the fake balancer that we registered (because the +// service config asks for the xds balancer). +// * In the fake balancer, we verify that we receive the expected LB config. +func TestXDSRsolverServiceConfig(t *testing.T) { + xdsAddr := fmt.Sprintf("%s:///dummy", xdsScheme) + cc, err := grpc.Dial(xdsAddr, grpc.WithInsecure()) + if err != nil { + t.Fatalf("grpc.Dial(%s) failed with error: %v", xdsAddr, err) + } + defer cc.Close() + + timer := time.NewTimer(5 * time.Second) + select { + case <-timer.C: + t.Fatal("timed out waiting for service config to reach balancer") + case err := <-fbb.errCh: + if err != nil { + t.Error(err) + } + } +}