From 914c52b240b05e4562b1d09e5e7cf3c57495d442 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 28 Feb 2019 16:19:50 -0800 Subject: [PATCH] xds: eds balancer (#2624) The balancer that handles eds response and does two level balancing (locality level and endpoint level). --- balancer/xds/edsbalancer/balancergroup.go | 7 +- balancer/xds/edsbalancer/edsbalancer.go | 289 +++++++++- balancer/xds/edsbalancer/edsbalancer_test.go | 526 +++++++++++++++++++ balancer/xds/edsbalancer/util.go | 56 ++ balancer/xds/edsbalancer/util_test.go | 76 +++ balancer/xds/xds.go | 20 +- balancer/xds/xds_test.go | 25 +- 7 files changed, 958 insertions(+), 41 deletions(-) create mode 100644 balancer/xds/edsbalancer/edsbalancer_test.go create mode 100644 balancer/xds/edsbalancer/util.go create mode 100644 balancer/xds/edsbalancer/util_test.go diff --git a/balancer/xds/edsbalancer/balancergroup.go b/balancer/xds/edsbalancer/balancergroup.go index a77875bd..7957708b 100644 --- a/balancer/xds/edsbalancer/balancergroup.go +++ b/balancer/xds/edsbalancer/balancergroup.go @@ -14,9 +14,6 @@ * limitations under the License. */ -// TODO: move package comment to edsbalancer.go. - -// Package edsbalancer implements balancer generated from an eds response. package edsbalancer import ( @@ -238,6 +235,10 @@ func (bg *balancerGroup) close() { for _, b := range bg.idToBalancer { b.Close() } + // Also remove all SubConns. + for sc := range bg.scToID { + bg.cc.RemoveSubConn(sc) + } bg.mu.Unlock() } diff --git a/balancer/xds/edsbalancer/edsbalancer.go b/balancer/xds/edsbalancer/edsbalancer.go index 6c1da917..8b59c2cf 100644 --- a/balancer/xds/edsbalancer/edsbalancer.go +++ b/balancer/xds/edsbalancer/edsbalancer.go @@ -1,5 +1,4 @@ /* - * * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,46 +12,294 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ +// Package edsbalancer implements a balancer to handle EDS responses. package edsbalancer -// TODO: this file is used as a place holder. It should be deleted after edsbalancer implementation -// is merged. - import ( + "context" "encoding/json" + "fmt" + "reflect" + "sync" - v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + xdstypepb "github.com/envoyproxy/go-control-plane/envoy/type" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/grpclog" "google.golang.org/grpc/resolver" + "google.golang.org/grpc/status" ) -type dummyEdsBalancer struct{} - -func (d *dummyEdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { - panic("implement me") +type localityConfig struct { + weight uint32 + addrs []resolver.Address } -func (d *dummyEdsBalancer) HandleResolvedAddrs([]resolver.Address, error) { - panic("implement me") +// EDSBalancer does load balancing based on the EDS responses. Note that it +// doesn't implement the balancer interface. It's intended to be used by a high +// level balancer implementation. +// +// The localities are picked as weighted round robin. A configurable child +// policy is used to manage endpoints in each locality. +type EDSBalancer struct { + balancer.ClientConn + + bg *balancerGroup + subBalancerBuilder balancer.Builder + lidToConfig map[string]*localityConfig + + pickerMu sync.Mutex + drops []*dropper + innerPicker balancer.Picker // The picker without drop support. + innerState connectivity.State // The state of the picker. } -func (d *dummyEdsBalancer) Close() { - panic("implement me") +// NewXDSBalancer create a new EDSBalancer. +func NewXDSBalancer(cc balancer.ClientConn) *EDSBalancer { + xdsB := &EDSBalancer{ + ClientConn: cc, + subBalancerBuilder: balancer.Get(roundrobin.Name), + + lidToConfig: make(map[string]*localityConfig), + } + // Don't start balancer group here. Start it when handling the first EDS + // response. Otherwise the balancer group will be started with round-robin, + // and if users specify a different sub-balancer, all balancers in balancer + // group will be closed and recreated when sub-balancer update happens. + return xdsB } -func (d *dummyEdsBalancer) HandleEDSResponse(edsResp *v2.ClusterLoadAssignment) { - panic("implement me") +// HandleChildPolicy updates the child balancers handling endpoints. Child +// policy is roundrobin by default. If the specified balancer is not installed, +// the old child balancer will be used. +// +// HandleChildPolicy and HandleEDSResponse must be called by the same goroutine. +func (xdsB *EDSBalancer) HandleChildPolicy(name string, config json.RawMessage) { + // name could come from cdsResp.GetLbPolicy().String(). LbPolicy.String() + // are all UPPER_CASE with underscore. + // + // No conversion is needed here because balancer package converts all names + // into lower_case before registering/looking up. + xdsB.updateSubBalancerName(name) + // TODO: (eds) send balancer config to the new child balancers. } -func (d *dummyEdsBalancer) HandleChildPolicy(name string, config json.RawMessage) { - panic("implement me") +func (xdsB *EDSBalancer) updateSubBalancerName(subBalancerName string) { + if xdsB.subBalancerBuilder.Name() == subBalancerName { + return + } + newSubBalancerBuilder := balancer.Get(subBalancerName) + if newSubBalancerBuilder == nil { + grpclog.Infof("EDSBalancer: failed to find balancer with name %q, keep using %q", subBalancerName, xdsB.subBalancerBuilder.Name()) + return + } + xdsB.subBalancerBuilder = newSubBalancerBuilder + if xdsB.bg != nil { + // xdsB.bg == nil until the first EDS response is handled. There's no + // need to update balancer group before that. + for id, config := range xdsB.lidToConfig { + // TODO: (eds) add support to balancer group to support smoothly + // switching sub-balancers (keep old balancer around until new + // balancer becomes ready). + xdsB.bg.remove(id) + xdsB.bg.add(id, config.weight, xdsB.subBalancerBuilder) + xdsB.bg.handleResolvedAddrs(id, config.addrs) + } + } } -// NewXDSBalancer creates an edsBalancer -func NewXDSBalancer(cc balancer.ClientConn) interface{} { - return &dummyEdsBalancer{} +// updateDrops compares new drop policies with the old. If they are different, +// it updates the drop policies and send ClientConn an updated picker. +func (xdsB *EDSBalancer) updateDrops(dropPolicies []*xdspb.ClusterLoadAssignment_Policy_DropOverload) { + var ( + newDrops []*dropper + dropsChanged bool + ) + for i, dropPolicy := range dropPolicies { + percentage := dropPolicy.GetDropPercentage() + var ( + numerator = percentage.GetNumerator() + denominator uint32 + ) + switch percentage.GetDenominator() { + case xdstypepb.FractionalPercent_HUNDRED: + denominator = 100 + case xdstypepb.FractionalPercent_TEN_THOUSAND: + denominator = 10000 + case xdstypepb.FractionalPercent_MILLION: + denominator = 1000000 + } + newDrops = append(newDrops, newDropper(numerator, denominator)) + + // The following reading xdsB.drops doesn't need mutex because it can only + // be updated by the code following. + if dropsChanged { + continue + } + if i >= len(xdsB.drops) { + dropsChanged = true + continue + } + if oldDrop := xdsB.drops[i]; numerator != oldDrop.numerator || denominator != oldDrop.denominator { + dropsChanged = true + } + } + if dropsChanged { + xdsB.pickerMu.Lock() + xdsB.drops = newDrops + if xdsB.innerPicker != nil { + // Update picker with old inner picker, new drops. + xdsB.ClientConn.UpdateBalancerState(xdsB.innerState, newDropPicker(xdsB.innerPicker, newDrops)) + } + xdsB.pickerMu.Unlock() + } +} + +// HandleEDSResponse handles the EDS response and creates/deletes localities and +// SubConns. It also handles drops. +// +// HandleCDSResponse and HandleEDSResponse must be called by the same goroutine. +func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment) { + // Create balancer group if it's never created (this is the first EDS + // response). + if xdsB.bg == nil { + xdsB.bg = newBalancerGroup(xdsB) + } + + // TODO: Unhandled fields from EDS response: + // - edsResp.GetPolicy().GetOverprovisioningFactor() + // - locality.GetPriority() + // - lbEndpoint.GetMetadata(): contains BNS name, send to sub-balancers + // - as service config or as resolved address + // - if socketAddress is not ip:port + // - socketAddress.GetNamedPort(), socketAddress.GetResolverName() + // - resolve endpoint's name with another resolver + + xdsB.updateDrops(edsResp.GetPolicy().GetDropOverloads()) + + // newLocalitiesSet contains all names of localitis in the new EDS response. + // It's used to delete localities that are removed in the new EDS response. + newLocalitiesSet := make(map[string]struct{}) + for _, locality := range edsResp.Endpoints { + // One balancer for each locality. + + l := locality.GetLocality() + if l == nil { + grpclog.Warningf("xds: received LocalityLbEndpoints with Locality") + continue + } + lid := fmt.Sprintf("%s-%s-%s", l.Region, l.Zone, l.SubZone) + newLocalitiesSet[lid] = struct{}{} + + newWeight := locality.GetLoadBalancingWeight().GetValue() + if newWeight == 0 { + // Weight can never be 0. + newWeight = 1 + } + + var newAddrs []resolver.Address + for _, lbEndpoint := range locality.GetLbEndpoints() { + socketAddress := lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress() + newAddrs = append(newAddrs, resolver.Address{ + Addr: fmt.Sprintf("%s:%d", socketAddress.GetAddress(), socketAddress.GetPortValue()), + }) + } + var weightChanged, addrsChanged bool + config, ok := xdsB.lidToConfig[lid] + if !ok { + // A new balancer, add it to balancer group and balancer map. + xdsB.bg.add(lid, newWeight, xdsB.subBalancerBuilder) + config = &localityConfig{ + weight: newWeight, + } + xdsB.lidToConfig[lid] = config + + // weightChanged is false for new locality, because there's no need to + // update weight in bg. + addrsChanged = true + } else { + // Compare weight and addrs. + if config.weight != newWeight { + weightChanged = true + } + if !reflect.DeepEqual(config.addrs, newAddrs) { + addrsChanged = true + } + } + + if weightChanged { + config.weight = newWeight + xdsB.bg.changeWeight(lid, newWeight) + } + + if addrsChanged { + config.addrs = newAddrs + xdsB.bg.handleResolvedAddrs(lid, newAddrs) + } + } + + // Delete localities that are removed in the latest response. + for lid := range xdsB.lidToConfig { + if _, ok := newLocalitiesSet[lid]; !ok { + xdsB.bg.remove(lid) + delete(xdsB.lidToConfig, lid) + } + } +} + +// HandleSubConnStateChange handles the state change and update pickers accordingly. +func (xdsB *EDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { + xdsB.bg.handleSubConnStateChange(sc, s) +} + +// UpdateBalancerState overrides balancer.ClientConn to wrap the picker in a +// dropPicker. +func (xdsB *EDSBalancer) UpdateBalancerState(s connectivity.State, p balancer.Picker) { + xdsB.pickerMu.Lock() + defer xdsB.pickerMu.Unlock() + xdsB.innerPicker = p + xdsB.innerState = s + // Don't reset drops when it's a state change. + xdsB.ClientConn.UpdateBalancerState(s, newDropPicker(p, xdsB.drops)) +} + +// Close closes the balancer. +func (xdsB *EDSBalancer) Close() { + xdsB.bg.close() +} + +type dropPicker struct { + drops []*dropper + p balancer.Picker +} + +func newDropPicker(p balancer.Picker, drops []*dropper) *dropPicker { + return &dropPicker{ + drops: drops, + p: p, + } +} + +func (d *dropPicker) Pick(ctx context.Context, opts balancer.PickOptions) (conn balancer.SubConn, done func(balancer.DoneInfo), err error) { + var drop bool + for _, dp := range d.drops { + // It's necessary to call drop on all droppers if the droppers are + // stateful. For example, if the second drop only drops 1/2, and only + // drops even number picks, we need to call it's drop() even if the + // first dropper already returned true. + // + // It won't be necessary if droppers are stateless, like toss a coin. + drop = drop || dp.drop() + } + if drop { + return nil, nil, status.Errorf(codes.Unavailable, "RPC is dropped") + } + // TODO: (eds) don't drop unless the inner picker is READY. Similar to + // https://github.com/grpc/grpc-go/issues/2622. + return d.p.Pick(ctx, opts) } diff --git a/balancer/xds/edsbalancer/edsbalancer_test.go b/balancer/xds/edsbalancer/edsbalancer_test.go new file mode 100644 index 00000000..40ae93c2 --- /dev/null +++ b/balancer/xds/edsbalancer/edsbalancer_test.go @@ -0,0 +1,526 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package edsbalancer + +import ( + "context" + "fmt" + "net" + "reflect" + "strconv" + "testing" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" + xdstypepb "github.com/envoyproxy/go-control-plane/envoy/type" + typespb "github.com/gogo/protobuf/types" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/resolver" +) + +var ( + testClusterNames = []string{"test-cluster-1", "test-cluster-2"} + testSubZones = []string{"I", "II", "III", "IV"} + testEndpointAddrs = []string{"1.1.1.1:1", "2.2.2.2:2", "3.3.3.3:3", "4.4.4.4:4"} +) + +type clusterLoadAssignmentBuilder struct { + v *xdspb.ClusterLoadAssignment +} + +func newClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32) *clusterLoadAssignmentBuilder { + var drops []*xdspb.ClusterLoadAssignment_Policy_DropOverload + for i, d := range dropPercents { + drops = append(drops, &xdspb.ClusterLoadAssignment_Policy_DropOverload{ + Category: fmt.Sprintf("test-drop-%d", i), + DropPercentage: &xdstypepb.FractionalPercent{ + Numerator: d, + Denominator: xdstypepb.FractionalPercent_HUNDRED, + }, + }) + } + + return &clusterLoadAssignmentBuilder{ + v: &xdspb.ClusterLoadAssignment{ + ClusterName: clusterName, + Policy: &xdspb.ClusterLoadAssignment_Policy{ + DropOverloads: drops, + }, + }, + } +} + +func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uint32, addrsWithPort []string) { + var lbEndPoints []endpointpb.LbEndpoint + for _, a := range addrsWithPort { + host, portStr, err := net.SplitHostPort(a) + if err != nil { + panic("failed to split " + a) + } + port, err := strconv.Atoi(portStr) + if err != nil { + panic("failed to atoi " + portStr) + } + + lbEndPoints = append(lbEndPoints, endpointpb.LbEndpoint{ + HostIdentifier: &endpointpb.LbEndpoint_Endpoint{ + Endpoint: &endpointpb.Endpoint{ + Address: &corepb.Address{ + Address: &corepb.Address_SocketAddress{ + SocketAddress: &corepb.SocketAddress{ + Protocol: corepb.TCP, + Address: host, + PortSpecifier: &corepb.SocketAddress_PortValue{ + PortValue: uint32(port)}}}}}}}, + ) + } + + clab.v.Endpoints = append(clab.v.Endpoints, endpointpb.LocalityLbEndpoints{ + Locality: &corepb.Locality{ + Region: "", + Zone: "", + SubZone: subzone, + }, + LbEndpoints: lbEndPoints, + LoadBalancingWeight: &typespb.UInt32Value{Value: weight}, + }) +} + +func (clab *clusterLoadAssignmentBuilder) build() *xdspb.ClusterLoadAssignment { + return clab.v +} + +// One locality +// - add backend +// - remove backend +// - replace backend +// - change drop rate +func TestEDS_OneLocality(t *testing.T) { + cc := newTestClientConn(t) + edsb := NewXDSBalancer(cc) + + // One locality with one backend. + clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1]) + edsb.HandleEDSResponse(clab1.build()) + + sc1 := <-cc.newSubConnCh + edsb.HandleSubConnStateChange(sc1, connectivity.Connecting) + edsb.HandleSubConnStateChange(sc1, connectivity.Ready) + + // Pick with only the first backend. + p1 := <-cc.newPickerCh + for i := 0; i < 5; i++ { + gotSC, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) + if !reflect.DeepEqual(gotSC, sc1) { + t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc1) + } + } + + // The same locality, add one more backend. + clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:2]) + edsb.HandleEDSResponse(clab2.build()) + + sc2 := <-cc.newSubConnCh + edsb.HandleSubConnStateChange(sc2, connectivity.Connecting) + edsb.HandleSubConnStateChange(sc2, connectivity.Ready) + + // Test roundrobin with two subconns. + p2 := <-cc.newPickerCh + want := []balancer.SubConn{sc1, sc2} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } + + // The same locality, delete first backend. + clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab3.addLocality(testSubZones[0], 1, testEndpointAddrs[1:2]) + edsb.HandleEDSResponse(clab3.build()) + + scToRemove := <-cc.removeSubConnCh + if !reflect.DeepEqual(scToRemove, sc1) { + t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove) + } + edsb.HandleSubConnStateChange(scToRemove, connectivity.Shutdown) + + // Test pick with only the second subconn. + p3 := <-cc.newPickerCh + for i := 0; i < 5; i++ { + gotSC, _, _ := p3.Pick(context.Background(), balancer.PickOptions{}) + if !reflect.DeepEqual(gotSC, sc2) { + t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc2) + } + } + + // The same locality, replace backend. + clab4 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab4.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3]) + edsb.HandleEDSResponse(clab4.build()) + + sc3 := <-cc.newSubConnCh + edsb.HandleSubConnStateChange(sc3, connectivity.Connecting) + edsb.HandleSubConnStateChange(sc3, connectivity.Ready) + scToRemove = <-cc.removeSubConnCh + if !reflect.DeepEqual(scToRemove, sc2) { + t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove) + } + edsb.HandleSubConnStateChange(scToRemove, connectivity.Shutdown) + + // Test pick with only the third subconn. + p4 := <-cc.newPickerCh + for i := 0; i < 5; i++ { + gotSC, _, _ := p4.Pick(context.Background(), balancer.PickOptions{}) + if !reflect.DeepEqual(gotSC, sc3) { + t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc3) + } + } + + // The same locality, different drop rate, dropping 50%. + clab5 := newClusterLoadAssignmentBuilder(testClusterNames[0], []uint32{50}) + clab5.addLocality(testSubZones[0], 1, testEndpointAddrs[2:3]) + edsb.HandleEDSResponse(clab5.build()) + + // Picks with drops. + p5 := <-cc.newPickerCh + for i := 0; i < 100; i++ { + _, _, err := p5.Pick(context.Background(), balancer.PickOptions{}) + // TODO: the dropping algorithm needs a design. When the dropping algorithm + // is fixed, this test also needs fix. + if i < 50 && err == nil { + t.Errorf("The first 50%% picks should be drops, got error ") + } else if i > 50 && err != nil { + t.Errorf("The second 50%% picks should be non-drops, got error %v", err) + } + } +} + +// 2 locality +// - start with 2 locality +// - add locality +// - remove locality +// - address change for the locality +// - update locality weight +func TestEDS_TwoLocalities(t *testing.T) { + cc := newTestClientConn(t) + edsb := NewXDSBalancer(cc) + + // Two localities, each with one backend. + clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1]) + clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2]) + edsb.HandleEDSResponse(clab1.build()) + + sc1 := <-cc.newSubConnCh + edsb.HandleSubConnStateChange(sc1, connectivity.Connecting) + edsb.HandleSubConnStateChange(sc1, connectivity.Ready) + sc2 := <-cc.newSubConnCh + edsb.HandleSubConnStateChange(sc2, connectivity.Connecting) + edsb.HandleSubConnStateChange(sc2, connectivity.Ready) + + // Test roundrobin with two subconns. + p1 := <-cc.newPickerCh + want := []balancer.SubConn{sc1, sc2} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } + + // Add another locality, with one backend. + clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab2.addLocality(testSubZones[0], 1, testEndpointAddrs[:1]) + clab2.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2]) + clab2.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3]) + edsb.HandleEDSResponse(clab2.build()) + + sc3 := <-cc.newSubConnCh + edsb.HandleSubConnStateChange(sc3, connectivity.Connecting) + edsb.HandleSubConnStateChange(sc3, connectivity.Ready) + + // Test roundrobin with three subconns. + p2 := <-cc.newPickerCh + want = []balancer.SubConn{sc1, sc2, sc3} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } + + // Remove first locality. + clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab3.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2]) + clab3.addLocality(testSubZones[2], 1, testEndpointAddrs[2:3]) + edsb.HandleEDSResponse(clab3.build()) + + scToRemove := <-cc.removeSubConnCh + if !reflect.DeepEqual(scToRemove, sc1) { + t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove) + } + edsb.HandleSubConnStateChange(scToRemove, connectivity.Shutdown) + + // Test pick with two subconns (without the first one). + p3 := <-cc.newPickerCh + want = []balancer.SubConn{sc2, sc3} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p3.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } + + // Add a backend to the last locality. + clab4 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab4.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2]) + clab4.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4]) + edsb.HandleEDSResponse(clab4.build()) + + sc4 := <-cc.newSubConnCh + edsb.HandleSubConnStateChange(sc4, connectivity.Connecting) + edsb.HandleSubConnStateChange(sc4, connectivity.Ready) + + // Test pick with two subconns (without the first one). + p4 := <-cc.newPickerCh + // Locality-1 will be picked twice, and locality-2 will be picked twice. + // Locality-1 contains only sc2, locality-2 contains sc3 and sc4. So expect + // two sc2's and sc3, sc4. + want = []balancer.SubConn{sc2, sc2, sc3, sc4} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p4.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } + + // Change weight of the locality[1]. + clab5 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab5.addLocality(testSubZones[1], 2, testEndpointAddrs[1:2]) + clab5.addLocality(testSubZones[2], 1, testEndpointAddrs[2:4]) + edsb.HandleEDSResponse(clab5.build()) + + // Test pick with two subconns different locality weight. + p5 := <-cc.newPickerCh + // Locality-1 will be picked four times, and locality-2 will be picked twice + // (weight 2 and 1). Locality-1 contains only sc2, locality-2 contains sc3 and + // sc4. So expect four sc2's and sc3, sc4. + want = []balancer.SubConn{sc2, sc2, sc2, sc2, sc3, sc4} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p5.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } +} + +func init() { + balancer.Register(&testConstBalancerBuilder{}) +} + +var errTestConstPicker = fmt.Errorf("const picker error") + +type testConstBalancerBuilder struct{} + +func (*testConstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + return &testConstBalancer{cc: cc} +} + +func (*testConstBalancerBuilder) Name() string { + return "test-const-balancer" +} + +type testConstBalancer struct { + cc balancer.ClientConn +} + +func (tb *testConstBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { + tb.cc.UpdateBalancerState(connectivity.Ready, &testConstPicker{err: errTestConstPicker}) +} + +func (tb *testConstBalancer) HandleResolvedAddrs([]resolver.Address, error) { + tb.cc.UpdateBalancerState(connectivity.Ready, &testConstPicker{err: errTestConstPicker}) +} + +func (*testConstBalancer) Close() { +} + +type testConstPicker struct { + err error + sc balancer.SubConn +} + +func (tcp *testConstPicker) Pick(ctx context.Context, opts balancer.PickOptions) (conn balancer.SubConn, done func(balancer.DoneInfo), err error) { + if tcp.err != nil { + return nil, nil, tcp.err + } + return tcp.sc, nil, nil +} + +// Create XDS balancer, and update sub-balancer before handling eds responses. +// Then switch between round-robin and test-const-balancer after handling first +// eds response. +func TestEDS_UpdateSubBalancerName(t *testing.T) { + cc := newTestClientConn(t) + edsb := NewXDSBalancer(cc) + + t.Logf("update sub-balancer to test-const-balancer") + edsb.HandleChildPolicy("test-const-balancer", nil) + + // Two localities, each with one backend. + clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.addLocality(testSubZones[0], 1, testEndpointAddrs[:1]) + clab1.addLocality(testSubZones[1], 1, testEndpointAddrs[1:2]) + edsb.HandleEDSResponse(clab1.build()) + + p0 := <-cc.newPickerCh + for i := 0; i < 5; i++ { + _, _, err := p0.Pick(context.Background(), balancer.PickOptions{}) + if !reflect.DeepEqual(err, errTestConstPicker) { + t.Fatalf("picker.Pick, got err %q, want err %q", err, errTestConstPicker) + } + } + + t.Logf("update sub-balancer to round-robin") + edsb.HandleChildPolicy(roundrobin.Name, nil) + + sc1 := <-cc.newSubConnCh + edsb.HandleSubConnStateChange(sc1, connectivity.Connecting) + edsb.HandleSubConnStateChange(sc1, connectivity.Ready) + sc2 := <-cc.newSubConnCh + edsb.HandleSubConnStateChange(sc2, connectivity.Connecting) + edsb.HandleSubConnStateChange(sc2, connectivity.Ready) + + // Test roundrobin with two subconns. + p1 := <-cc.newPickerCh + want := []balancer.SubConn{sc1, sc2} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } + + t.Logf("update sub-balancer to test-const-balancer") + edsb.HandleChildPolicy("test-const-balancer", nil) + + for i := 0; i < 2; i++ { + scToRemove := <-cc.removeSubConnCh + if !reflect.DeepEqual(scToRemove, sc1) && !reflect.DeepEqual(scToRemove, sc2) { + t.Fatalf("RemoveSubConn, want (%v or %v), got %v", sc1, sc2, scToRemove) + } + edsb.HandleSubConnStateChange(scToRemove, connectivity.Shutdown) + } + + p2 := <-cc.newPickerCh + for i := 0; i < 5; i++ { + _, _, err := p2.Pick(context.Background(), balancer.PickOptions{}) + if !reflect.DeepEqual(err, errTestConstPicker) { + t.Fatalf("picker.Pick, got err %q, want err %q", err, errTestConstPicker) + } + } + + t.Logf("update sub-balancer to round-robin") + edsb.HandleChildPolicy(roundrobin.Name, nil) + + sc3 := <-cc.newSubConnCh + edsb.HandleSubConnStateChange(sc3, connectivity.Connecting) + edsb.HandleSubConnStateChange(sc3, connectivity.Ready) + sc4 := <-cc.newSubConnCh + edsb.HandleSubConnStateChange(sc4, connectivity.Connecting) + edsb.HandleSubConnStateChange(sc4, connectivity.Ready) + + p3 := <-cc.newPickerCh + want = []balancer.SubConn{sc3, sc4} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p3.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } +} + +func TestDropPicker(t *testing.T) { + const pickCount = 12 + var constPicker = &testConstPicker{ + sc: testSubConns[0], + } + + tests := []struct { + name string + drops []*dropper + }{ + { + name: "no drop", + drops: nil, + }, + { + name: "one drop", + drops: []*dropper{ + newDropper(1, 2), + }, + }, + { + name: "two drops", + drops: []*dropper{ + newDropper(1, 3), + newDropper(1, 2), + }, + }, + { + name: "three drops", + drops: []*dropper{ + newDropper(1, 3), + newDropper(1, 4), + newDropper(1, 2), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + p := newDropPicker(constPicker, tt.drops) + + // scCount is the number of sc's returned by pick. The opposite of + // drop-count. + var ( + scCount int + wantCount = pickCount + ) + for _, dp := range tt.drops { + wantCount = wantCount * int(dp.denominator-dp.numerator) / int(dp.denominator) + } + + for i := 0; i < pickCount; i++ { + _, _, err := p.Pick(context.Background(), balancer.PickOptions{}) + if err == nil { + scCount++ + } + } + + if scCount != (wantCount) { + t.Errorf("drops: %+v, scCount %v, wantCount %v", tt.drops, scCount, wantCount) + } + }) + } +} diff --git a/balancer/xds/edsbalancer/util.go b/balancer/xds/edsbalancer/util.go new file mode 100644 index 00000000..46c6283f --- /dev/null +++ b/balancer/xds/edsbalancer/util.go @@ -0,0 +1,56 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package edsbalancer + +import ( + "sync" +) + +type dropper struct { + // Drop rate will be numerator/denominator. + numerator uint32 + denominator uint32 + + mu sync.Mutex + i uint32 +} + +func newDropper(numerator, denominator uint32) *dropper { + return &dropper{ + numerator: numerator, + denominator: denominator, + } +} + +func (d *dropper) drop() (ret bool) { + d.mu.Lock() + defer d.mu.Unlock() + + // TODO: the drop algorithm needs a design. + // Currently, for drop rate 3/5: + // 0 1 2 3 4 + // d d d n n + if d.i < d.numerator { + ret = true + } + d.i++ + if d.i >= d.denominator { + d.i = 0 + } + + return +} diff --git a/balancer/xds/edsbalancer/util_test.go b/balancer/xds/edsbalancer/util_test.go new file mode 100644 index 00000000..fa5338f8 --- /dev/null +++ b/balancer/xds/edsbalancer/util_test.go @@ -0,0 +1,76 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package edsbalancer + +import ( + "testing" +) + +func TestDropper(t *testing.T) { + const repeat = 2 + + type args struct { + numerator uint32 + denominator uint32 + } + tests := []struct { + name string + args args + }{ + { + name: "2_3", + args: args{ + numerator: 2, + denominator: 3, + }, + }, + { + name: "4_8", + args: args{ + numerator: 4, + denominator: 8, + }, + }, + { + name: "7_20", + args: args{ + numerator: 7, + denominator: 20, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + d := newDropper(tt.args.numerator, tt.args.denominator) + var ( + dCount int + wantCount = int(tt.args.numerator) * repeat + loopCount = int(tt.args.denominator) * repeat + ) + for i := 0; i < loopCount; i++ { + if d.drop() { + dCount++ + } + } + + if dCount != (wantCount) { + t.Errorf("with numerator %v, denominator %v repeat %v, got drop count: %v, want %v", + tt.args.numerator, tt.args.denominator, repeat, dCount, wantCount) + } + }) + } +} diff --git a/balancer/xds/xds.go b/balancer/xds/xds.go index 53f4c6e5..2f78327c 100644 --- a/balancer/xds/xds.go +++ b/balancer/xds/xds.go @@ -48,7 +48,9 @@ var ( // TODO: if later we make startupTimeout configurable through BuildOptions(maybe?), then we can remove // this field and configure through BuildOptions instead. startupTimeout = defaultTimeout - newEDSBalancer = edsbalancer.NewXDSBalancer + newEDSBalancer = func(cc balancer.ClientConn) edsBalancerInterface { + return edsbalancer.NewXDSBalancer(cc) + } ) func init() { @@ -86,13 +88,19 @@ func (b *xdsBalancerBuilder) Name() string { return xdsName } -// EdsBalancer defines the interface that edsBalancer must implement to communicate with xdsBalancer. -type EdsBalancer interface { - balancer.Balancer +// edsBalancerInterface defines the interface that edsBalancer must implement to +// communicate with xdsBalancer. +// +// It's implemented by the real eds balancer and a fake testing eds balancer. +type edsBalancerInterface interface { // HandleEDSResponse passes the received EDS message from traffic director to eds balancer. HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment) // HandleChildPolicy updates the eds balancer the intra-cluster load balancing policy to use. HandleChildPolicy(name string, config json.RawMessage) + // HandleSubConnStateChange handles state change for SubConn. + HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) + // Close closes the eds balancer. + Close() } // xdsBalancer manages xdsClient and the actual balancer that does load balancing (either edsBalancer, @@ -116,7 +124,7 @@ type xdsBalancer struct { client *client // may change when passed a different service config config *xdsConfig // may change when passed a different service config - xdsLB EdsBalancer + xdsLB edsBalancerInterface fallbackLB balancer.Balancer fallbackInitData *addressUpdate // may change when HandleResolved address is called } @@ -447,7 +455,7 @@ func (x *xdsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() { x.fallbackLB.Close() x.fallbackLB = nil } - x.xdsLB = newEDSBalancer(x.cc).(EdsBalancer) + x.xdsLB = newEDSBalancer(x.cc) if x.config.ChildPolicy != nil { x.xdsLB.HandleChildPolicy(x.config.ChildPolicy.Name, x.config.ChildPolicy.Config) } diff --git a/balancer/xds/xds_test.go b/balancer/xds/xds_test.go index b5eb04a4..7d922c17 100644 --- a/balancer/xds/xds_test.go +++ b/balancer/xds/xds_test.go @@ -30,7 +30,6 @@ import ( xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/xds/edsbalancer" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/resolver" ) @@ -204,10 +203,6 @@ func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state co f.subconnStateChange <- &scStateChange{sc: sc, state: state} } -func (f *fakeEDSBalancer) HandleResolvedAddrs([]resolver.Address, error) { - panic("implement me") -} - func (f *fakeEDSBalancer) Close() { mu.Lock() defer mu.Unlock() @@ -225,7 +220,7 @@ func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) } } -func newFakeEDSBalancer(cc balancer.ClientConn) interface{} { +func newFakeEDSBalancer(cc balancer.ClientConn) edsBalancerInterface { lb := &fakeEDSBalancer{ cc: cc, edsChan: make(chan *xdspb.ClusterLoadAssignment, 10), @@ -286,10 +281,11 @@ func (s) TestXdsBalanceHandleResolvedAddrs(t *testing.T) { func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) { startupTimeout = 500 * time.Millisecond + originalNewEDSBalancer := newEDSBalancer newEDSBalancer = newFakeEDSBalancer defer func() { startupTimeout = defaultTimeout - newEDSBalancer = edsbalancer.NewXDSBalancer + newEDSBalancer = originalNewEDSBalancer }() builder := balancer.Get("xds") @@ -361,9 +357,10 @@ func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) { // switch child policy, lb stays the same // cds->eds or eds -> cds, restart xdsClient, lb stays the same func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) { + originalNewEDSBalancer := newEDSBalancer newEDSBalancer = newFakeEDSBalancer defer func() { - newEDSBalancer = edsbalancer.NewXDSBalancer + newEDSBalancer = originalNewEDSBalancer }() builder := balancer.Get("xds") @@ -447,10 +444,12 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) { // not in fallback mode, overwrite fallback info. // in fallback mode, update config or switch balancer. func (s) TestXdsBalanceHandleBalancerConfigFallbackUpdate(t *testing.T) { + originalNewEDSBalancer := newEDSBalancer newEDSBalancer = newFakeEDSBalancer defer func() { - newEDSBalancer = edsbalancer.NewXDSBalancer + newEDSBalancer = originalNewEDSBalancer }() + builder := balancer.Get("xds") cc := newTestClientConn() lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer) @@ -525,10 +524,12 @@ func (s) TestXdsBalanceHandleBalancerConfigFallbackUpdate(t *testing.T) { } func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) { + originalNewEDSBalancer := newEDSBalancer newEDSBalancer = newFakeEDSBalancer defer func() { - newEDSBalancer = edsbalancer.NewXDSBalancer + newEDSBalancer = originalNewEDSBalancer }() + builder := balancer.Get("xds") cc := newTestClientConn() lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer) @@ -605,10 +606,12 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) { } func (s) TestXdsBalancerFallbackSignalFromEdsBalancer(t *testing.T) { + originalNewEDSBalancer := newEDSBalancer newEDSBalancer = newFakeEDSBalancer defer func() { - newEDSBalancer = edsbalancer.NewXDSBalancer + newEDSBalancer = originalNewEDSBalancer }() + builder := balancer.Get("xds") cc := newTestClientConn() lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)