xds: add weighted_target balancer (#3541)

This commit is contained in:
Menghan Li
2020-04-29 12:56:03 -07:00
committed by GitHub
parent eedec2c1c3
commit fe604e1fdf
6 changed files with 589 additions and 4 deletions

View File

@ -126,6 +126,22 @@ func (sbc *subBalancerWithConfig) updateClientConnState(s balancer.ClientConnSta
return b.UpdateClientConnState(s)
}
func (sbc *subBalancerWithConfig) resolverError(err error) {
b := sbc.balancer
if b == nil {
// This sub-balancer was closed. This should never happen because
// sub-balancers are closed when the locality is removed from EDS, or
// the balancer group is closed. There should be no further address
// updates when either of this happened.
//
// This will be a common case with priority support, because a
// sub-balancer (and the whole balancer group) could be closed because
// it's the lower priority, but it can still get address updates.
return
}
b.ResolverError(err)
}
func (sbc *subBalancerWithConfig) stopBalancer() {
sbc.balancer.Close()
sbc.balancer = nil
@ -451,10 +467,14 @@ func (bg *BalancerGroup) UpdateClientConnState(id internal.LocalityID, s balance
return nil
}
// TODO: handleServiceConfig()
//
// For BNS address for slicer, comes from endpoint.Metadata. It will be sent
// from parent to sub-balancers as service config.
// ResolverError forwards resolver errors to all sub-balancers.
func (bg *BalancerGroup) ResolverError(err error) {
bg.outgoingMu.Lock()
for _, config := range bg.idToBalancerConfig {
config.resolverError(err)
}
bg.outgoingMu.Unlock()
}
// Following are actions from sub-balancers, forward to ClientConn.

View File

@ -0,0 +1,29 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package weightedtarget
import (
"fmt"
)
const prefix = "[weighted-target-lb %p] "
func loggingPrefix(p *weightedTargetBalancer) string {
return fmt.Sprintf(prefix, p)
}

View File

@ -0,0 +1,151 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package weightedtarget implements the weighted_target balancer.
package weightedtarget
import (
"encoding/json"
"fmt"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/hierarchy"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
)
const weightedTargetName = "weighted_target_experimental"
func init() {
balancer.Register(&weightedTargetBB{})
}
type weightedTargetBB struct{}
func (wt *weightedTargetBB) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
b := &weightedTargetBalancer{}
b.logger = grpclog.NewPrefixLogger(loggingPrefix(b))
b.bg = balancergroup.New(cc, nil, b.logger)
b.bg.Start()
b.logger.Infof("Created")
return b
}
func (wt *weightedTargetBB) Name() string {
return weightedTargetName
}
func (wt *weightedTargetBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return parseConfig(c)
}
type weightedTargetBalancer struct {
logger *grpclog.PrefixLogger
// TODO: Make this package not dependent on any xds specific code.
// BalancerGroup uses xdsinternal.LocalityID as the key in the map of child
// policies that it maintains and reports load using LRS. Once these two
// dependencies are removed from the balancerGroup, this package will not
// have any dependencies on xds code.
bg *balancergroup.BalancerGroup
targets map[string]target
}
// TODO: remove this and use strings directly as keys for balancer group.
func makeLocalityFromName(name string) internal.LocalityID {
return internal.LocalityID{Region: name}
}
// UpdateClientConnState takes the new targets in balancer group,
// creates/deletes sub-balancers and sends them update. Addresses are split into
// groups based on hierarchy path.
func (w *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
newConfig, ok := s.BalancerConfig.(*lbConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}
addressesSplit := hierarchy.Group(s.ResolverState.Addresses)
// Remove sub-balancers that are not in the new config.
for name := range w.targets {
if _, ok := newConfig.Targets[name]; !ok {
w.bg.Remove(makeLocalityFromName(name))
}
}
// For sub-balancers in the new config
// - if it's new. add to balancer group,
// - if it's old, but has a new weight, update weight in balancer group.
//
// For all sub-balancers, forward the address/balancer config update.
for name, newT := range newConfig.Targets {
l := makeLocalityFromName(name)
oldT, ok := w.targets[name]
if !ok {
// If this is a new sub-balancer, add it.
w.bg.Add(l, newT.Weight, balancer.Get(newT.ChildPolicy.Name))
} else if newT.Weight != oldT.Weight {
// If this is an existing sub-balancer, update weight if necessary.
w.bg.ChangeWeight(l, newT.Weight)
}
// Forwards all the update:
// - Addresses are from the map after splitting with hierarchy path,
// - Top level service config and attributes are the same,
// - Balancer config comes from the targets map.
//
// TODO: handle error? How to aggregate errors and return?
_ = w.bg.UpdateClientConnState(l, balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: addressesSplit[name],
ServiceConfig: s.ResolverState.ServiceConfig,
Attributes: s.ResolverState.Attributes,
},
BalancerConfig: newT.ChildPolicy.Config,
})
}
w.targets = newConfig.Targets
return nil
}
func (w *weightedTargetBalancer) ResolverError(err error) {
w.bg.ResolverError(err)
}
func (w *weightedTargetBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
w.bg.UpdateSubConnState(sc, state)
}
func (w *weightedTargetBalancer) Close() {
w.bg.Close()
}
func (w *weightedTargetBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
w.logger.Errorf("UpdateSubConnState should be called instead of HandleSubConnStateChange")
}
func (w *weightedTargetBalancer) HandleResolvedAddrs([]resolver.Address, error) {
w.logger.Errorf("UpdateClientConnState should be called instead of HandleResolvedAddrs")
}

View File

@ -0,0 +1,56 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package weightedtarget
import (
"encoding/json"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/serviceconfig"
)
type target struct {
// Weight is the weight of the child policy.
Weight uint32
// ChildPolicy is the child policy and it's config.
ChildPolicy *internalserviceconfig.BalancerConfig
}
// lbConfig is the balancer config for weighted_target. The proto representation
// is:
//
// message WeightedTargetConfig {
// message Target {
// uint32 weight = 1;
// repeated LoadBalancingConfig child_policy = 2;
// }
// map<string, Target> targets = 1;
// }
type lbConfig struct {
serviceconfig.LoadBalancingConfig
Targets map[string]target
}
func parseConfig(c json.RawMessage) (*lbConfig, error) {
var cfg lbConfig
if err := json.Unmarshal(c, &cfg); err != nil {
return nil, err
}
return &cfg, nil
}

View File

@ -0,0 +1,104 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package weightedtarget
import (
"testing"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer"
)
const (
testJSONConfig = `{
"targets": {
"cluster_1" : {
"weight":75,
"childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}]
},
"cluster_2" : {
"weight":25,
"childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}]
}
}
}`
cdsName = "cds_experimental"
)
var (
cdsConfigParser = balancer.Get(cdsName).(balancer.ConfigParser)
cdsConfigJSON1 = `{"cluster":"cluster_1"}`
cdsConfig1, _ = cdsConfigParser.ParseConfig([]byte(cdsConfigJSON1))
cdsConfigJSON2 = `{"cluster":"cluster_2"}`
cdsConfig2, _ = cdsConfigParser.ParseConfig([]byte(cdsConfigJSON2))
)
func Test_parseConfig(t *testing.T) {
tests := []struct {
name string
js string
want *lbConfig
wantErr bool
}{
{
name: "empty json",
js: "",
want: nil,
wantErr: true,
},
{
name: "OK",
js: testJSONConfig,
want: &lbConfig{
Targets: map[string]target{
"cluster_1": {
Weight: 75,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: cdsName,
Config: cdsConfig1,
},
},
"cluster_2": {
Weight: 25,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: cdsName,
Config: cdsConfig2,
},
},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseConfig([]byte(tt.js))
if (err != nil) != tt.wantErr {
t.Errorf("parseConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !cmp.Equal(got, tt.want) {
t.Errorf("parseConfig() got unexpected result, diff: %v", cmp.Diff(got, tt.want))
}
})
}
}

View File

@ -0,0 +1,225 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package weightedtarget
import (
"encoding/json"
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/hierarchy"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/testutils"
)
type testConfigBalancerBuilder struct {
balancer.Builder
}
func newTestConfigBalancerBuilder() *testConfigBalancerBuilder {
return &testConfigBalancerBuilder{
Builder: balancer.Get(roundrobin.Name),
}
}
func (t *testConfigBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
rr := t.Builder.Build(cc, opts)
return &testConfigBalancer{
Balancer: rr,
}
}
const testConfigBalancerName = "test_config_balancer"
func (t *testConfigBalancerBuilder) Name() string {
return testConfigBalancerName
}
type stringBalancerConfig struct {
serviceconfig.LoadBalancingConfig
s string
}
func (t *testConfigBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
// Return string without quotes.
return stringBalancerConfig{s: string(c[1 : len(c)-1])}, nil
}
// testConfigBalancer is a roundrobin balancer, but it takes the balancer config
// string and append it to the backend addresses.
type testConfigBalancer struct {
balancer.Balancer
}
func (b *testConfigBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
c, ok := s.BalancerConfig.(stringBalancerConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type %T", s.BalancerConfig)
}
oneMoreAddr := resolver.Address{Addr: c.s}
s.BalancerConfig = nil
s.ResolverState.Addresses = append(s.ResolverState.Addresses, oneMoreAddr)
return b.Balancer.UpdateClientConnState(s)
}
func (b *testConfigBalancer) Close() {
b.Balancer.Close()
}
var (
wtbBuilder balancer.Builder
wtbParser balancer.ConfigParser
testBackendAddrStrs []string
)
const testBackendAddrsCount = 12
func init() {
balancer.Register(newTestConfigBalancerBuilder())
for i := 0; i < testBackendAddrsCount; i++ {
testBackendAddrStrs = append(testBackendAddrStrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
}
wtbBuilder = balancer.Get(weightedTargetName)
wtbParser = wtbBuilder.(balancer.ConfigParser)
balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
}
// TestWeightedTarget covers the cases that a sub-balancer is added and a
// sub-balancer is removed. It verifies that the addresses and balancer configs
// are forwarded to the right sub-balancer.
//
// This test is intended to test the glue code in weighted_target. Most of the
// functionality tests are covered by the balancer group tests.
func TestWeightedTarget(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
// Start with "cluster_1: round_robin".
config1, err := wtbParser.ParseConfig([]byte(`{"targets":{"cluster_1":{"weight":1,"childPolicy":[{"round_robin":""}]}}}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and an address with hierarchy path ["cluster_1"].
wantAddr1 := resolver.Address{Addr: testBackendAddrStrs[0], Attributes: nil}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddr1, []string{"cluster_1"}),
}},
BalancerConfig: config1,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Verify that a subconn is created with the address, and the hierarchy path
// in the address is cleared.
addr1 := <-cc.NewSubConnAddrsCh
if want := []resolver.Address{
hierarchy.Set(wantAddr1, []string{}),
}; !cmp.Equal(addr1, want, cmp.AllowUnexported(attributes.Attributes{})) {
t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr1, want, cmp.AllowUnexported(attributes.Attributes{})))
}
// Send subconn state change.
sc1 := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
}
// Remove cluster_1, and add "cluster_2: test_config_balancer".
wantAddr3Str := testBackendAddrStrs[2]
config2, err := wtbParser.ParseConfig([]byte(
fmt.Sprintf(`{"targets":{"cluster_2":{"weight":1,"childPolicy":[{%q:%q}]}}}`, testConfigBalancerName, wantAddr3Str),
))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and one address with hierarchy path "cluster_2".
wantAddr2 := resolver.Address{Addr: testBackendAddrStrs[1], Attributes: nil}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddr2, []string{"cluster_2"}),
}},
BalancerConfig: config2,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Expect the address sent in the address list. The hierarchy path should be
// cleared.
addr2 := <-cc.NewSubConnAddrsCh
if want := []resolver.Address{
hierarchy.Set(wantAddr2, []string{}),
}; !cmp.Equal(addr2, want, cmp.AllowUnexported(attributes.Attributes{})) {
t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr2, want, cmp.AllowUnexported(attributes.Attributes{})))
}
// Expect the other address sent as balancer config. This address doesn't
// have hierarchy path.
wantAddr3 := resolver.Address{Addr: wantAddr3Str, Attributes: nil}
addr3 := <-cc.NewSubConnAddrsCh
if want := []resolver.Address{wantAddr3}; !cmp.Equal(addr3, want, cmp.AllowUnexported(attributes.Attributes{})) {
t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr3, want, cmp.AllowUnexported(attributes.Attributes{})))
}
// The subconn for cluster_1 should be removed.
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
}
wtb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
sc2 := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc3 := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin pick with backends in cluster_2.
p2 := <-cc.NewPickerCh
want := []balancer.SubConn{sc2, sc3}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
return func() balancer.SubConn {
scst, _ := p.Pick(balancer.PickInfo{})
return scst.SubConn
}
}