xds: eds balancer (#2624)

The balancer that handles eds response and does two level balancing (locality level and endpoint level).
This commit is contained in:
Menghan Li
2019-02-28 16:19:50 -08:00
committed by GitHub
parent cba9cfed64
commit 914c52b240
7 changed files with 958 additions and 41 deletions

View File

@ -14,9 +14,6 @@
* limitations under the License. * limitations under the License.
*/ */
// TODO: move package comment to edsbalancer.go.
// Package edsbalancer implements balancer generated from an eds response.
package edsbalancer package edsbalancer
import ( import (
@ -238,6 +235,10 @@ func (bg *balancerGroup) close() {
for _, b := range bg.idToBalancer { for _, b := range bg.idToBalancer {
b.Close() b.Close()
} }
// Also remove all SubConns.
for sc := range bg.scToID {
bg.cc.RemoveSubConn(sc)
}
bg.mu.Unlock() bg.mu.Unlock()
} }

View File

@ -1,5 +1,4 @@
/* /*
*
* Copyright 2019 gRPC authors. * Copyright 2019 gRPC authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
@ -13,46 +12,294 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*
*/ */
// Package edsbalancer implements a balancer to handle EDS responses.
package edsbalancer package edsbalancer
// TODO: this file is used as a place holder. It should be deleted after edsbalancer implementation
// is merged.
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt"
"reflect"
"sync"
v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
xdstypepb "github.com/envoyproxy/go-control-plane/envoy/type"
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
) )
type dummyEdsBalancer struct{} type localityConfig struct {
weight uint32
func (d *dummyEdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { addrs []resolver.Address
panic("implement me")
} }
func (d *dummyEdsBalancer) HandleResolvedAddrs([]resolver.Address, error) { // EDSBalancer does load balancing based on the EDS responses. Note that it
panic("implement me") // doesn't implement the balancer interface. It's intended to be used by a high
// level balancer implementation.
//
// The localities are picked as weighted round robin. A configurable child
// policy is used to manage endpoints in each locality.
type EDSBalancer struct {
balancer.ClientConn
bg *balancerGroup
subBalancerBuilder balancer.Builder
lidToConfig map[string]*localityConfig
pickerMu sync.Mutex
drops []*dropper
innerPicker balancer.Picker // The picker without drop support.
innerState connectivity.State // The state of the picker.
} }
func (d *dummyEdsBalancer) Close() { // NewXDSBalancer create a new EDSBalancer.
panic("implement me") func NewXDSBalancer(cc balancer.ClientConn) *EDSBalancer {
xdsB := &EDSBalancer{
ClientConn: cc,
subBalancerBuilder: balancer.Get(roundrobin.Name),
lidToConfig: make(map[string]*localityConfig),
}
// Don't start balancer group here. Start it when handling the first EDS
// response. Otherwise the balancer group will be started with round-robin,
// and if users specify a different sub-balancer, all balancers in balancer
// group will be closed and recreated when sub-balancer update happens.
return xdsB
} }
func (d *dummyEdsBalancer) HandleEDSResponse(edsResp *v2.ClusterLoadAssignment) { // HandleChildPolicy updates the child balancers handling endpoints. Child
panic("implement me") // policy is roundrobin by default. If the specified balancer is not installed,
// the old child balancer will be used.
//
// HandleChildPolicy and HandleEDSResponse must be called by the same goroutine.
func (xdsB *EDSBalancer) HandleChildPolicy(name string, config json.RawMessage) {
// name could come from cdsResp.GetLbPolicy().String(). LbPolicy.String()
// are all UPPER_CASE with underscore.
//
// No conversion is needed here because balancer package converts all names
// into lower_case before registering/looking up.
xdsB.updateSubBalancerName(name)
// TODO: (eds) send balancer config to the new child balancers.
} }
func (d *dummyEdsBalancer) HandleChildPolicy(name string, config json.RawMessage) { func (xdsB *EDSBalancer) updateSubBalancerName(subBalancerName string) {
panic("implement me") if xdsB.subBalancerBuilder.Name() == subBalancerName {
return
}
newSubBalancerBuilder := balancer.Get(subBalancerName)
if newSubBalancerBuilder == nil {
grpclog.Infof("EDSBalancer: failed to find balancer with name %q, keep using %q", subBalancerName, xdsB.subBalancerBuilder.Name())
return
}
xdsB.subBalancerBuilder = newSubBalancerBuilder
if xdsB.bg != nil {
// xdsB.bg == nil until the first EDS response is handled. There's no
// need to update balancer group before that.
for id, config := range xdsB.lidToConfig {
// TODO: (eds) add support to balancer group to support smoothly
// switching sub-balancers (keep old balancer around until new
// balancer becomes ready).
xdsB.bg.remove(id)
xdsB.bg.add(id, config.weight, xdsB.subBalancerBuilder)
xdsB.bg.handleResolvedAddrs(id, config.addrs)
}
}
} }
// NewXDSBalancer creates an edsBalancer // updateDrops compares new drop policies with the old. If they are different,
func NewXDSBalancer(cc balancer.ClientConn) interface{} { // it updates the drop policies and send ClientConn an updated picker.
return &dummyEdsBalancer{} func (xdsB *EDSBalancer) updateDrops(dropPolicies []*xdspb.ClusterLoadAssignment_Policy_DropOverload) {
var (
newDrops []*dropper
dropsChanged bool
)
for i, dropPolicy := range dropPolicies {
percentage := dropPolicy.GetDropPercentage()
var (
numerator = percentage.GetNumerator()
denominator uint32
)
switch percentage.GetDenominator() {
case xdstypepb.FractionalPercent_HUNDRED:
denominator = 100
case xdstypepb.FractionalPercent_TEN_THOUSAND:
denominator = 10000
case xdstypepb.FractionalPercent_MILLION:
denominator = 1000000
}
newDrops = append(newDrops, newDropper(numerator, denominator))
// The following reading xdsB.drops doesn't need mutex because it can only
// be updated by the code following.
if dropsChanged {
continue
}
if i >= len(xdsB.drops) {
dropsChanged = true
continue
}
if oldDrop := xdsB.drops[i]; numerator != oldDrop.numerator || denominator != oldDrop.denominator {
dropsChanged = true
}
}
if dropsChanged {
xdsB.pickerMu.Lock()
xdsB.drops = newDrops
if xdsB.innerPicker != nil {
// Update picker with old inner picker, new drops.
xdsB.ClientConn.UpdateBalancerState(xdsB.innerState, newDropPicker(xdsB.innerPicker, newDrops))
}
xdsB.pickerMu.Unlock()
}
}
// HandleEDSResponse handles the EDS response and creates/deletes localities and
// SubConns. It also handles drops.
//
// HandleCDSResponse and HandleEDSResponse must be called by the same goroutine.
func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment) {
// Create balancer group if it's never created (this is the first EDS
// response).
if xdsB.bg == nil {
xdsB.bg = newBalancerGroup(xdsB)
}
// TODO: Unhandled fields from EDS response:
// - edsResp.GetPolicy().GetOverprovisioningFactor()
// - locality.GetPriority()
// - lbEndpoint.GetMetadata(): contains BNS name, send to sub-balancers
// - as service config or as resolved address
// - if socketAddress is not ip:port
// - socketAddress.GetNamedPort(), socketAddress.GetResolverName()
// - resolve endpoint's name with another resolver
xdsB.updateDrops(edsResp.GetPolicy().GetDropOverloads())
// newLocalitiesSet contains all names of localitis in the new EDS response.
// It's used to delete localities that are removed in the new EDS response.
newLocalitiesSet := make(map[string]struct{})
for _, locality := range edsResp.Endpoints {
// One balancer for each locality.
l := locality.GetLocality()
if l == nil {
grpclog.Warningf("xds: received LocalityLbEndpoints with <nil> Locality")
continue
}
lid := fmt.Sprintf("%s-%s-%s", l.Region, l.Zone, l.SubZone)
newLocalitiesSet[lid] = struct{}{}
newWeight := locality.GetLoadBalancingWeight().GetValue()
if newWeight == 0 {
// Weight can never be 0.
newWeight = 1
}
var newAddrs []resolver.Address
for _, lbEndpoint := range locality.GetLbEndpoints() {
socketAddress := lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()
newAddrs = append(newAddrs, resolver.Address{
Addr: fmt.Sprintf("%s:%d", socketAddress.GetAddress(), socketAddress.GetPortValue()),
})
}
var weightChanged, addrsChanged bool
config, ok := xdsB.lidToConfig[lid]
if !ok {
// A new balancer, add it to balancer group and balancer map.
xdsB.bg.add(lid, newWeight, xdsB.subBalancerBuilder)
config = &localityConfig{
weight: newWeight,
}
xdsB.lidToConfig[lid] = config
// weightChanged is false for new locality, because there's no need to
// update weight in bg.
addrsChanged = true
} else {
// Compare weight and addrs.
if config.weight != newWeight {
weightChanged = true
}
if !reflect.DeepEqual(config.addrs, newAddrs) {
addrsChanged = true
}
}
if weightChanged {
config.weight = newWeight
xdsB.bg.changeWeight(lid, newWeight)
}
if addrsChanged {
config.addrs = newAddrs
xdsB.bg.handleResolvedAddrs(lid, newAddrs)
}
}
// Delete localities that are removed in the latest response.
for lid := range xdsB.lidToConfig {
if _, ok := newLocalitiesSet[lid]; !ok {
xdsB.bg.remove(lid)
delete(xdsB.lidToConfig, lid)
}
}
}
// HandleSubConnStateChange handles the state change and update pickers accordingly.
func (xdsB *EDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
xdsB.bg.handleSubConnStateChange(sc, s)
}
// UpdateBalancerState overrides balancer.ClientConn to wrap the picker in a
// dropPicker.
func (xdsB *EDSBalancer) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
xdsB.pickerMu.Lock()
defer xdsB.pickerMu.Unlock()
xdsB.innerPicker = p
xdsB.innerState = s
// Don't reset drops when it's a state change.
xdsB.ClientConn.UpdateBalancerState(s, newDropPicker(p, xdsB.drops))
}
// Close closes the balancer.
func (xdsB *EDSBalancer) Close() {
xdsB.bg.close()
}
type dropPicker struct {
drops []*dropper
p balancer.Picker
}
func newDropPicker(p balancer.Picker, drops []*dropper) *dropPicker {
return &dropPicker{
drops: drops,
p: p,
}
}
func (d *dropPicker) Pick(ctx context.Context, opts balancer.PickOptions) (conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
var drop bool
for _, dp := range d.drops {
// It's necessary to call drop on all droppers if the droppers are
// stateful. For example, if the second drop only drops 1/2, and only
// drops even number picks, we need to call it's drop() even if the
// first dropper already returned true.
//
// It won't be necessary if droppers are stateless, like toss a coin.
drop = drop || dp.drop()
}
if drop {
return nil, nil, status.Errorf(codes.Unavailable, "RPC is dropped")
}
// TODO: (eds) don't drop unless the inner picker is READY. Similar to
// https://github.com/grpc/grpc-go/issues/2622.
return d.p.Pick(ctx, opts)
} }

View File

@ -0,0 +1,526 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edsbalancer
import (
"context"
"fmt"
"net"
"reflect"
"strconv"
"testing"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
xdstypepb "github.com/envoyproxy/go-control-plane/envoy/type"
typespb "github.com/gogo/protobuf/types"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
)
var (
testClusterNames = []string{"test-cluster-1", "test-cluster-2"}
testSubZones = []string{"I", "II", "III", "IV"}
testEndpointAddrs = []string{"1.1.1.1:1", "2.2.2.2:2", "3.3.3.3:3", "4.4.4.4:4"}
)
type clusterLoadAssignmentBuilder struct {
v *xdspb.ClusterLoadAssignment
}
func newClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32) *clusterLoadAssignmentBuilder {
var drops []*xdspb.ClusterLoadAssignment_Policy_DropOverload
for i, d := range dropPercents {
drops = append(drops, &xdspb.ClusterLoadAssignment_Policy_DropOverload{
Category: fmt.Sprintf("test-drop-%d", i),
DropPercentage: &xdstypepb.FractionalPercent{
Numerator: d,
Denominator: xdstypepb.FractionalPercent_HUNDRED,
},
})
}
return &clusterLoadAssignmentBuilder{
v: &xdspb.ClusterLoadAssignment{
ClusterName: clusterName,
Policy: &xdspb.ClusterLoadAssignment_Policy{
DropOverloads: drops,
},
},
}
}
func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uint32, addrsWithPort []string) {
var lbEndPoints []endpointpb.LbEndpoint
for _, a := range addrsWithPort {
host, portStr, err := net.SplitHostPort(a)
if err != nil {
panic("failed to split " + a)
}
port, err := strconv.Atoi(portStr)
if err != nil {
panic("failed to atoi " + portStr)
}
lbEndPoints = append(lbEndPoints, endpointpb.LbEndpoint{
HostIdentifier: &endpointpb.LbEndpoint_Endpoint{
Endpoint: &endpointpb.Endpoint{
Address: &corepb.Address{
Address: &corepb.Address_SocketAddress{
SocketAddress: &corepb.SocketAddress{
Protocol: corepb.TCP,
Address: host,
PortSpecifier: &corepb.SocketAddress_PortValue{
PortValue: uint32(port)}}}}}}},
)
}
clab.v.Endpoints = append(clab.v.Endpoints, endpointpb.LocalityLbEndpoints{
Locality: &corepb.Locality{
Region: "",
Zone: "",
SubZone: subzone,
},
LbEndpoints: lbEndPoints,
LoadBalancingWeight: &typespb.UInt32Value{Value: weight},
})
}
func (clab *clusterLoadAssignmentBuilder) build() *xdspb.ClusterLoadAssignment {
return clab.v
}
// One locality
// - add backend
// - remove backend
// - replace backend
// - change drop rate
func TestEDS_OneLocality(t *testing.T) {
cc := newTestClientConn(t)
edsb := NewXDSBalancer(cc)
// One locality with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
edsb.HandleEDSResponse(clab1.build())
sc1 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)
// Pick with only the first backend.
p1 := <-cc.newPickerCh
for i := 0; i < 5; i++ {
gotSC, _, _ := p1.Pick(context.Background(), balancer.PickOptions{})
if !reflect.DeepEqual(gotSC, sc1) {
t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc1)
}
}
// The same locality, add one more backend.
clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:2])
edsb.HandleEDSResponse(clab2.build())
sc2 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)
// Test roundrobin with two subconns.
p2 := <-cc.newPickerCh
want := []balancer.SubConn{sc1, sc2}
if err := isRoundRobin(want, func() balancer.SubConn {
sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{})
return sc
}); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// The same locality, delete first backend.
clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.addLocality(testSubZones[0], 1, testEndpointAddrs[1:2])
edsb.HandleEDSResponse(clab3.build())
scToRemove := <-cc.removeSubConnCh
if !reflect.DeepEqual(scToRemove, sc1) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
}
edsb.HandleSubConnStateChange(scToRemove, connectivity.Shutdown)
// Test pick with only the second subconn.
p3 := <-cc.newPickerCh
for i := 0; i < 5; i++ {
gotSC, _, _ := p3.Pick(context.Background(), balancer.PickOptions{})
if !reflect.DeepEqual(gotSC, sc2) {
t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc2)
}
}
// The same locality, replace backend.
clab4 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3])
edsb.HandleEDSResponse(clab4.build())
sc3 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc3, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc3, connectivity.Ready)
scToRemove = <-cc.removeSubConnCh
if !reflect.DeepEqual(scToRemove, sc2) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove)
}
edsb.HandleSubConnStateChange(scToRemove, connectivity.Shutdown)
// Test pick with only the third subconn.
p4 := <-cc.newPickerCh
for i := 0; i < 5; i++ {
gotSC, _, _ := p4.Pick(context.Background(), balancer.PickOptions{})
if !reflect.DeepEqual(gotSC, sc3) {
t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc3)
}
}
// The same locality, different drop rate, dropping 50%.
clab5 := newClusterLoadAssignmentBuilder(testClusterNames[0], []uint32{50})
clab5.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3])
edsb.HandleEDSResponse(clab5.build())
// Picks with drops.
p5 := <-cc.newPickerCh
for i := 0; i < 100; i++ {
_, _, err := p5.Pick(context.Background(), balancer.PickOptions{})
// TODO: the dropping algorithm needs a design. When the dropping algorithm
// is fixed, this test also needs fix.
if i < 50 && err == nil {
t.Errorf("The first 50%% picks should be drops, got error <nil>")
} else if i > 50 && err != nil {
t.Errorf("The second 50%% picks should be non-drops, got error %v", err)
}
}
}
// 2 locality
// - start with 2 locality
// - add locality
// - remove locality
// - address change for the <not-the-first> locality
// - update locality weight
func TestEDS_TwoLocalities(t *testing.T) {
cc := newTestClientConn(t)
edsb := NewXDSBalancer(cc)
// Two localities, each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
edsb.HandleEDSResponse(clab1.build())
sc1 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)
sc2 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)
// Test roundrobin with two subconns.
p1 := <-cc.newPickerCh
want := []balancer.SubConn{sc1, sc2}
if err := isRoundRobin(want, func() balancer.SubConn {
sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{})
return sc
}); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Add another locality, with one backend.
clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
clab2.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab2.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3])
edsb.HandleEDSResponse(clab2.build())
sc3 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc3, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc3, connectivity.Ready)
// Test roundrobin with three subconns.
p2 := <-cc.newPickerCh
want = []balancer.SubConn{sc1, sc2, sc3}
if err := isRoundRobin(want, func() balancer.SubConn {
sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{})
return sc
}); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Remove first locality.
clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab3.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3])
edsb.HandleEDSResponse(clab3.build())
scToRemove := <-cc.removeSubConnCh
if !reflect.DeepEqual(scToRemove, sc1) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
}
edsb.HandleSubConnStateChange(scToRemove, connectivity.Shutdown)
// Test pick with two subconns (without the first one).
p3 := <-cc.newPickerCh
want = []balancer.SubConn{sc2, sc3}
if err := isRoundRobin(want, func() balancer.SubConn {
sc, _, _ := p3.Pick(context.Background(), balancer.PickOptions{})
return sc
}); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Add a backend to the last locality.
clab4 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
clab4.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4])
edsb.HandleEDSResponse(clab4.build())
sc4 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc4, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc4, connectivity.Ready)
// Test pick with two subconns (without the first one).
p4 := <-cc.newPickerCh
// Locality-1 will be picked twice, and locality-2 will be picked twice.
// Locality-1 contains only sc2, locality-2 contains sc3 and sc4. So expect
// two sc2's and sc3, sc4.
want = []balancer.SubConn{sc2, sc2, sc3, sc4}
if err := isRoundRobin(want, func() balancer.SubConn {
sc, _, _ := p4.Pick(context.Background(), balancer.PickOptions{})
return sc
}); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Change weight of the locality[1].
clab5 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab5.addLocality(testSubZones[1], 2, testEndpointAddrs[1:2])
clab5.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4])
edsb.HandleEDSResponse(clab5.build())
// Test pick with two subconns different locality weight.
p5 := <-cc.newPickerCh
// Locality-1 will be picked four times, and locality-2 will be picked twice
// (weight 2 and 1). Locality-1 contains only sc2, locality-2 contains sc3 and
// sc4. So expect four sc2's and sc3, sc4.
want = []balancer.SubConn{sc2, sc2, sc2, sc2, sc3, sc4}
if err := isRoundRobin(want, func() balancer.SubConn {
sc, _, _ := p5.Pick(context.Background(), balancer.PickOptions{})
return sc
}); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
func init() {
balancer.Register(&testConstBalancerBuilder{})
}
var errTestConstPicker = fmt.Errorf("const picker error")
type testConstBalancerBuilder struct{}
func (*testConstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return &testConstBalancer{cc: cc}
}
func (*testConstBalancerBuilder) Name() string {
return "test-const-balancer"
}
type testConstBalancer struct {
cc balancer.ClientConn
}
func (tb *testConstBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
tb.cc.UpdateBalancerState(connectivity.Ready, &testConstPicker{err: errTestConstPicker})
}
func (tb *testConstBalancer) HandleResolvedAddrs([]resolver.Address, error) {
tb.cc.UpdateBalancerState(connectivity.Ready, &testConstPicker{err: errTestConstPicker})
}
func (*testConstBalancer) Close() {
}
type testConstPicker struct {
err error
sc balancer.SubConn
}
func (tcp *testConstPicker) Pick(ctx context.Context, opts balancer.PickOptions) (conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
if tcp.err != nil {
return nil, nil, tcp.err
}
return tcp.sc, nil, nil
}
// Create XDS balancer, and update sub-balancer before handling eds responses.
// Then switch between round-robin and test-const-balancer after handling first
// eds response.
func TestEDS_UpdateSubBalancerName(t *testing.T) {
cc := newTestClientConn(t)
edsb := NewXDSBalancer(cc)
t.Logf("update sub-balancer to test-const-balancer")
edsb.HandleChildPolicy("test-const-balancer", nil)
// Two localities, each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1])
clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2])
edsb.HandleEDSResponse(clab1.build())
p0 := <-cc.newPickerCh
for i := 0; i < 5; i++ {
_, _, err := p0.Pick(context.Background(), balancer.PickOptions{})
if !reflect.DeepEqual(err, errTestConstPicker) {
t.Fatalf("picker.Pick, got err %q, want err %q", err, errTestConstPicker)
}
}
t.Logf("update sub-balancer to round-robin")
edsb.HandleChildPolicy(roundrobin.Name, nil)
sc1 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)
sc2 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)
// Test roundrobin with two subconns.
p1 := <-cc.newPickerCh
want := []balancer.SubConn{sc1, sc2}
if err := isRoundRobin(want, func() balancer.SubConn {
sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{})
return sc
}); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
t.Logf("update sub-balancer to test-const-balancer")
edsb.HandleChildPolicy("test-const-balancer", nil)
for i := 0; i < 2; i++ {
scToRemove := <-cc.removeSubConnCh
if !reflect.DeepEqual(scToRemove, sc1) && !reflect.DeepEqual(scToRemove, sc2) {
t.Fatalf("RemoveSubConn, want (%v or %v), got %v", sc1, sc2, scToRemove)
}
edsb.HandleSubConnStateChange(scToRemove, connectivity.Shutdown)
}
p2 := <-cc.newPickerCh
for i := 0; i < 5; i++ {
_, _, err := p2.Pick(context.Background(), balancer.PickOptions{})
if !reflect.DeepEqual(err, errTestConstPicker) {
t.Fatalf("picker.Pick, got err %q, want err %q", err, errTestConstPicker)
}
}
t.Logf("update sub-balancer to round-robin")
edsb.HandleChildPolicy(roundrobin.Name, nil)
sc3 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc3, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc3, connectivity.Ready)
sc4 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc4, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc4, connectivity.Ready)
p3 := <-cc.newPickerCh
want = []balancer.SubConn{sc3, sc4}
if err := isRoundRobin(want, func() balancer.SubConn {
sc, _, _ := p3.Pick(context.Background(), balancer.PickOptions{})
return sc
}); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
func TestDropPicker(t *testing.T) {
const pickCount = 12
var constPicker = &testConstPicker{
sc: testSubConns[0],
}
tests := []struct {
name string
drops []*dropper
}{
{
name: "no drop",
drops: nil,
},
{
name: "one drop",
drops: []*dropper{
newDropper(1, 2),
},
},
{
name: "two drops",
drops: []*dropper{
newDropper(1, 3),
newDropper(1, 2),
},
},
{
name: "three drops",
drops: []*dropper{
newDropper(1, 3),
newDropper(1, 4),
newDropper(1, 2),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := newDropPicker(constPicker, tt.drops)
// scCount is the number of sc's returned by pick. The opposite of
// drop-count.
var (
scCount int
wantCount = pickCount
)
for _, dp := range tt.drops {
wantCount = wantCount * int(dp.denominator-dp.numerator) / int(dp.denominator)
}
for i := 0; i < pickCount; i++ {
_, _, err := p.Pick(context.Background(), balancer.PickOptions{})
if err == nil {
scCount++
}
}
if scCount != (wantCount) {
t.Errorf("drops: %+v, scCount %v, wantCount %v", tt.drops, scCount, wantCount)
}
})
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edsbalancer
import (
"sync"
)
type dropper struct {
// Drop rate will be numerator/denominator.
numerator uint32
denominator uint32
mu sync.Mutex
i uint32
}
func newDropper(numerator, denominator uint32) *dropper {
return &dropper{
numerator: numerator,
denominator: denominator,
}
}
func (d *dropper) drop() (ret bool) {
d.mu.Lock()
defer d.mu.Unlock()
// TODO: the drop algorithm needs a design.
// Currently, for drop rate 3/5:
// 0 1 2 3 4
// d d d n n
if d.i < d.numerator {
ret = true
}
d.i++
if d.i >= d.denominator {
d.i = 0
}
return
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edsbalancer
import (
"testing"
)
func TestDropper(t *testing.T) {
const repeat = 2
type args struct {
numerator uint32
denominator uint32
}
tests := []struct {
name string
args args
}{
{
name: "2_3",
args: args{
numerator: 2,
denominator: 3,
},
},
{
name: "4_8",
args: args{
numerator: 4,
denominator: 8,
},
},
{
name: "7_20",
args: args{
numerator: 7,
denominator: 20,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := newDropper(tt.args.numerator, tt.args.denominator)
var (
dCount int
wantCount = int(tt.args.numerator) * repeat
loopCount = int(tt.args.denominator) * repeat
)
for i := 0; i < loopCount; i++ {
if d.drop() {
dCount++
}
}
if dCount != (wantCount) {
t.Errorf("with numerator %v, denominator %v repeat %v, got drop count: %v, want %v",
tt.args.numerator, tt.args.denominator, repeat, dCount, wantCount)
}
})
}
}

View File

@ -48,7 +48,9 @@ var (
// TODO: if later we make startupTimeout configurable through BuildOptions(maybe?), then we can remove // TODO: if later we make startupTimeout configurable through BuildOptions(maybe?), then we can remove
// this field and configure through BuildOptions instead. // this field and configure through BuildOptions instead.
startupTimeout = defaultTimeout startupTimeout = defaultTimeout
newEDSBalancer = edsbalancer.NewXDSBalancer newEDSBalancer = func(cc balancer.ClientConn) edsBalancerInterface {
return edsbalancer.NewXDSBalancer(cc)
}
) )
func init() { func init() {
@ -86,13 +88,19 @@ func (b *xdsBalancerBuilder) Name() string {
return xdsName return xdsName
} }
// EdsBalancer defines the interface that edsBalancer must implement to communicate with xdsBalancer. // edsBalancerInterface defines the interface that edsBalancer must implement to
type EdsBalancer interface { // communicate with xdsBalancer.
balancer.Balancer //
// It's implemented by the real eds balancer and a fake testing eds balancer.
type edsBalancerInterface interface {
// HandleEDSResponse passes the received EDS message from traffic director to eds balancer. // HandleEDSResponse passes the received EDS message from traffic director to eds balancer.
HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment)
// HandleChildPolicy updates the eds balancer the intra-cluster load balancing policy to use. // HandleChildPolicy updates the eds balancer the intra-cluster load balancing policy to use.
HandleChildPolicy(name string, config json.RawMessage) HandleChildPolicy(name string, config json.RawMessage)
// HandleSubConnStateChange handles state change for SubConn.
HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State)
// Close closes the eds balancer.
Close()
} }
// xdsBalancer manages xdsClient and the actual balancer that does load balancing (either edsBalancer, // xdsBalancer manages xdsClient and the actual balancer that does load balancing (either edsBalancer,
@ -116,7 +124,7 @@ type xdsBalancer struct {
client *client // may change when passed a different service config client *client // may change when passed a different service config
config *xdsConfig // may change when passed a different service config config *xdsConfig // may change when passed a different service config
xdsLB EdsBalancer xdsLB edsBalancerInterface
fallbackLB balancer.Balancer fallbackLB balancer.Balancer
fallbackInitData *addressUpdate // may change when HandleResolved address is called fallbackInitData *addressUpdate // may change when HandleResolved address is called
} }
@ -447,7 +455,7 @@ func (x *xdsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() {
x.fallbackLB.Close() x.fallbackLB.Close()
x.fallbackLB = nil x.fallbackLB = nil
} }
x.xdsLB = newEDSBalancer(x.cc).(EdsBalancer) x.xdsLB = newEDSBalancer(x.cc)
if x.config.ChildPolicy != nil { if x.config.ChildPolicy != nil {
x.xdsLB.HandleChildPolicy(x.config.ChildPolicy.Name, x.config.ChildPolicy.Config) x.xdsLB.HandleChildPolicy(x.config.ChildPolicy.Name, x.config.ChildPolicy.Config)
} }

View File

@ -30,7 +30,6 @@ import (
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/xds/edsbalancer"
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
) )
@ -204,10 +203,6 @@ func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state co
f.subconnStateChange <- &scStateChange{sc: sc, state: state} f.subconnStateChange <- &scStateChange{sc: sc, state: state}
} }
func (f *fakeEDSBalancer) HandleResolvedAddrs([]resolver.Address, error) {
panic("implement me")
}
func (f *fakeEDSBalancer) Close() { func (f *fakeEDSBalancer) Close() {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
@ -225,7 +220,7 @@ func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage)
} }
} }
func newFakeEDSBalancer(cc balancer.ClientConn) interface{} { func newFakeEDSBalancer(cc balancer.ClientConn) edsBalancerInterface {
lb := &fakeEDSBalancer{ lb := &fakeEDSBalancer{
cc: cc, cc: cc,
edsChan: make(chan *xdspb.ClusterLoadAssignment, 10), edsChan: make(chan *xdspb.ClusterLoadAssignment, 10),
@ -286,10 +281,11 @@ func (s) TestXdsBalanceHandleResolvedAddrs(t *testing.T) {
func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) { func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) {
startupTimeout = 500 * time.Millisecond startupTimeout = 500 * time.Millisecond
originalNewEDSBalancer := newEDSBalancer
newEDSBalancer = newFakeEDSBalancer newEDSBalancer = newFakeEDSBalancer
defer func() { defer func() {
startupTimeout = defaultTimeout startupTimeout = defaultTimeout
newEDSBalancer = edsbalancer.NewXDSBalancer newEDSBalancer = originalNewEDSBalancer
}() }()
builder := balancer.Get("xds") builder := balancer.Get("xds")
@ -361,9 +357,10 @@ func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) {
// switch child policy, lb stays the same // switch child policy, lb stays the same
// cds->eds or eds -> cds, restart xdsClient, lb stays the same // cds->eds or eds -> cds, restart xdsClient, lb stays the same
func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) { func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) {
originalNewEDSBalancer := newEDSBalancer
newEDSBalancer = newFakeEDSBalancer newEDSBalancer = newFakeEDSBalancer
defer func() { defer func() {
newEDSBalancer = edsbalancer.NewXDSBalancer newEDSBalancer = originalNewEDSBalancer
}() }()
builder := balancer.Get("xds") builder := balancer.Get("xds")
@ -447,10 +444,12 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) {
// not in fallback mode, overwrite fallback info. // not in fallback mode, overwrite fallback info.
// in fallback mode, update config or switch balancer. // in fallback mode, update config or switch balancer.
func (s) TestXdsBalanceHandleBalancerConfigFallbackUpdate(t *testing.T) { func (s) TestXdsBalanceHandleBalancerConfigFallbackUpdate(t *testing.T) {
originalNewEDSBalancer := newEDSBalancer
newEDSBalancer = newFakeEDSBalancer newEDSBalancer = newFakeEDSBalancer
defer func() { defer func() {
newEDSBalancer = edsbalancer.NewXDSBalancer newEDSBalancer = originalNewEDSBalancer
}() }()
builder := balancer.Get("xds") builder := balancer.Get("xds")
cc := newTestClientConn() cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer) lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
@ -525,10 +524,12 @@ func (s) TestXdsBalanceHandleBalancerConfigFallbackUpdate(t *testing.T) {
} }
func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) { func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) {
originalNewEDSBalancer := newEDSBalancer
newEDSBalancer = newFakeEDSBalancer newEDSBalancer = newFakeEDSBalancer
defer func() { defer func() {
newEDSBalancer = edsbalancer.NewXDSBalancer newEDSBalancer = originalNewEDSBalancer
}() }()
builder := balancer.Get("xds") builder := balancer.Get("xds")
cc := newTestClientConn() cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer) lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
@ -605,10 +606,12 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) {
} }
func (s) TestXdsBalancerFallbackSignalFromEdsBalancer(t *testing.T) { func (s) TestXdsBalancerFallbackSignalFromEdsBalancer(t *testing.T) {
originalNewEDSBalancer := newEDSBalancer
newEDSBalancer = newFakeEDSBalancer newEDSBalancer = newFakeEDSBalancer
defer func() { defer func() {
newEDSBalancer = edsbalancer.NewXDSBalancer newEDSBalancer = originalNewEDSBalancer
}() }()
builder := balancer.Get("xds") builder := balancer.Get("xds")
cc := newTestClientConn() cc := newTestClientConn()
lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer) lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)