diff --git a/balancer/xds/edsbalancer/balancergroup.go b/balancer/xds/edsbalancer/balancergroup.go new file mode 100644 index 00000000..a77875bd --- /dev/null +++ b/balancer/xds/edsbalancer/balancergroup.go @@ -0,0 +1,337 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// TODO: move package comment to edsbalancer.go. + +// Package edsbalancer implements balancer generated from an eds response. +package edsbalancer + +import ( + "context" + "sync" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/resolver" +) + +type pickerState struct { + weight uint32 + picker balancer.Picker + state connectivity.State +} + +// balancerGroup takes a list of balancers, and make then into one balancer. +// +// Note that this struct doesn't implement balancer.Balancer, because it's not +// intended to be used directly as a balancer. It's expected to be used as a +// sub-balancer manager by a high level balancer. +// +// Updates from ClientConn are forwarded to sub-balancers +// - service config update +// - Not implemented +// - address update +// - subConn state change +// - find the corresponding balancer and forward +// +// Actions from sub-balances are forwarded to parent ClientConn +// - new/remove SubConn +// - picker update and health states change +// - sub-pickers are grouped into a group-picker +// - aggregated connectivity state is the overall state of all pickers. +// - resolveNow +type balancerGroup struct { + cc balancer.ClientConn + + mu sync.Mutex + idToBalancer map[string]balancer.Balancer + scToID map[balancer.SubConn]string + + pickerMu sync.Mutex + // All balancer IDs exist as keys in this map. If an ID is not in map, it's + // either removed or never added. + idToPickerState map[string]*pickerState +} + +func newBalancerGroup(cc balancer.ClientConn) *balancerGroup { + return &balancerGroup{ + cc: cc, + + scToID: make(map[balancer.SubConn]string), + idToBalancer: make(map[string]balancer.Balancer), + idToPickerState: make(map[string]*pickerState), + } +} + +// add adds a balancer built by builder to the group, with given id and weight. +func (bg *balancerGroup) add(id string, weight uint32, builder balancer.Builder) { + bg.mu.Lock() + if _, ok := bg.idToBalancer[id]; ok { + bg.mu.Unlock() + grpclog.Warningf("balancer group: adding a balancer with existing ID: %s", id) + return + } + bg.mu.Unlock() + bgcc := &balancerGroupCC{ + id: id, + group: bg, + } + b := builder.Build(bgcc, balancer.BuildOptions{}) + bg.mu.Lock() + bg.idToBalancer[id] = b + bg.mu.Unlock() + + bg.pickerMu.Lock() + bg.idToPickerState[id] = &pickerState{ + weight: weight, + // Start everything in IDLE. It's doesn't affect the overall state + // because we don't count IDLE when aggregating (as opposite to e.g. + // READY, 1 READY results in overall READY). + state: connectivity.Idle, + } + bg.pickerMu.Unlock() +} + +// remove removes the balancer with id from the group, and closes the balancer. +// +// It also removes the picker generated from this balancer from the picker +// group. It always results in a picker update. +func (bg *balancerGroup) remove(id string) { + bg.mu.Lock() + // Close balancer. + if b, ok := bg.idToBalancer[id]; ok { + b.Close() + delete(bg.idToBalancer, id) + } + // Remove SubConns. + for sc, bid := range bg.scToID { + if bid == id { + bg.cc.RemoveSubConn(sc) + delete(bg.scToID, sc) + } + } + bg.mu.Unlock() + + bg.pickerMu.Lock() + // Remove id and picker from picker map. This also results in future updates + // for this ID to be ignored. + delete(bg.idToPickerState, id) + // Update state and picker to reflect the changes. + bg.cc.UpdateBalancerState(buildPickerAndState(bg.idToPickerState)) + bg.pickerMu.Unlock() +} + +// changeWeight changes the weight of the balancer. +// +// NOTE: It always results in a picker update now. This probably isn't +// necessary. But it seems better to do the update because it's a change in the +// picker (which is balancer's snapshot). +func (bg *balancerGroup) changeWeight(id string, newWeight uint32) { + bg.pickerMu.Lock() + defer bg.pickerMu.Unlock() + pState, ok := bg.idToPickerState[id] + if !ok { + return + } + if pState.weight == newWeight { + return + } + pState.weight = newWeight + // Update state and picker to reflect the changes. + bg.cc.UpdateBalancerState(buildPickerAndState(bg.idToPickerState)) +} + +// Following are actions from the parent grpc.ClientConn, forward to sub-balancers. + +// SubConn state change: find the corresponding balancer and then forward. +func (bg *balancerGroup) handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { + grpclog.Infof("balancer group: handle subconn state change: %p, %v", sc, state) + bg.mu.Lock() + var b balancer.Balancer + if id, ok := bg.scToID[sc]; ok { + if state == connectivity.Shutdown { + // Only delete sc from the map when state changed to Shutdown. + delete(bg.scToID, sc) + } + b = bg.idToBalancer[id] + } + bg.mu.Unlock() + if b == nil { + grpclog.Infof("balancer group: balancer not found for sc state change") + return + } + b.HandleSubConnStateChange(sc, state) +} + +// Address change: forward to balancer. +func (bg *balancerGroup) handleResolvedAddrs(id string, addrs []resolver.Address) { + bg.mu.Lock() + b, ok := bg.idToBalancer[id] + bg.mu.Unlock() + if !ok { + grpclog.Infof("balancer group: balancer with id %q not found", id) + return + } + b.HandleResolvedAddrs(addrs, nil) +} + +// TODO: handleServiceConfig() +// +// For BNS address for slicer, comes from endpoint.Metadata. It will be sent +// from parent to sub-balancers as service config. + +// Following are actions from sub-balancers, forward to ClientConn. + +// newSubConn: forward to ClientConn, and also create a map from sc to balancer, +// so state update will find the right balancer. +// +// One note about removing SubConn: only forward to ClientConn, but not delete +// from map. Delete sc from the map only when state changes to Shutdown. Since +// it's just forwarding the action, there's no need for a removeSubConn() +// wrapper function. +func (bg *balancerGroup) newSubConn(id string, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + sc, err := bg.cc.NewSubConn(addrs, opts) + if err != nil { + return nil, err + } + bg.mu.Lock() + bg.scToID[sc] = id + bg.mu.Unlock() + return sc, nil +} + +// updateBalancerState: create an aggregated picker and an aggregated +// connectivity state, then forward to ClientConn. +func (bg *balancerGroup) updateBalancerState(id string, state connectivity.State, picker balancer.Picker) { + grpclog.Infof("balancer group: update balancer state: %v, %v, %p", id, state, picker) + bg.pickerMu.Lock() + defer bg.pickerMu.Unlock() + pickerSt, ok := bg.idToPickerState[id] + if !ok { + // All state starts in IDLE. If ID is not in map, it's either removed, + // or never existed. + grpclog.Infof("balancer group: pickerState not found when update picker/state") + return + } + pickerSt.picker = picker + pickerSt.state = state + bg.cc.UpdateBalancerState(buildPickerAndState(bg.idToPickerState)) +} + +func (bg *balancerGroup) close() { + bg.mu.Lock() + for _, b := range bg.idToBalancer { + b.Close() + } + bg.mu.Unlock() +} + +func buildPickerAndState(m map[string]*pickerState) (connectivity.State, balancer.Picker) { + var readyN, connectingN int + readyPickerWithWeights := make([]pickerState, 0, len(m)) + for _, ps := range m { + switch ps.state { + case connectivity.Ready: + readyN++ + readyPickerWithWeights = append(readyPickerWithWeights, *ps) + case connectivity.Connecting: + connectingN++ + } + } + var aggregatedState connectivity.State + switch { + case readyN > 0: + aggregatedState = connectivity.Ready + case connectingN > 0: + aggregatedState = connectivity.Connecting + default: + aggregatedState = connectivity.TransientFailure + } + if aggregatedState == connectivity.TransientFailure { + return aggregatedState, base.NewErrPicker(balancer.ErrTransientFailure) + } + return aggregatedState, newPickerGroup(readyPickerWithWeights) +} + +type pickerGroup struct { + readyPickerWithWeights []pickerState + length int + + mu sync.Mutex + idx int // The index of the picker that will be picked + count uint32 // The number of times the current picker has been picked. +} + +// newPickerGroup takes pickers with weights, and group them into one picker. +// +// Note it only takes ready pickers. The map shouldn't contain non-ready +// pickers. +// +// TODO: (bg) confirm this is the expected behavior: non-ready balancers should +// be ignored when picking. Only ready balancers are picked. +func newPickerGroup(readyPickerWithWeights []pickerState) *pickerGroup { + return &pickerGroup{ + readyPickerWithWeights: readyPickerWithWeights, + length: len(readyPickerWithWeights), + } +} + +func (pg *pickerGroup) Pick(ctx context.Context, opts balancer.PickOptions) (conn balancer.SubConn, done func(balancer.DoneInfo), err error) { + if pg.length <= 0 { + return nil, nil, balancer.ErrNoSubConnAvailable + } + // TODO: the WRR algorithm needs a design. + // MAYBE: move WRR implmentation to util.go as a separate struct. + pg.mu.Lock() + pickerSt := pg.readyPickerWithWeights[pg.idx] + p := pickerSt.picker + pg.count++ + if pg.count >= pickerSt.weight { + pg.idx = (pg.idx + 1) % pg.length + pg.count = 0 + } + pg.mu.Unlock() + return p.Pick(ctx, opts) +} + +// balancerGroupCC implements the balancer.ClientConn API and get passed to each +// sub-balancer. It contains the sub-balancer ID, so the parent balancer can +// keep track of SubConn/pickers and the sub-balancers they belong to. +// +// Some of the actions are forwarded to the parent ClientConn with no change. +// Some are forward to balancer group with the sub-balancer ID. +type balancerGroupCC struct { + id string + group *balancerGroup +} + +func (bgcc *balancerGroupCC) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + return bgcc.group.newSubConn(bgcc.id, addrs, opts) +} +func (bgcc *balancerGroupCC) RemoveSubConn(sc balancer.SubConn) { + bgcc.group.cc.RemoveSubConn(sc) +} +func (bgcc *balancerGroupCC) UpdateBalancerState(state connectivity.State, picker balancer.Picker) { + bgcc.group.updateBalancerState(bgcc.id, state, picker) +} +func (bgcc *balancerGroupCC) ResolveNow(opt resolver.ResolveNowOption) { + bgcc.group.cc.ResolveNow(opt) +} +func (bgcc *balancerGroupCC) Target() string { + return bgcc.group.cc.Target() +} diff --git a/balancer/xds/edsbalancer/balancergroup_test.go b/balancer/xds/edsbalancer/balancergroup_test.go new file mode 100644 index 00000000..580d9069 --- /dev/null +++ b/balancer/xds/edsbalancer/balancergroup_test.go @@ -0,0 +1,376 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package edsbalancer + +import ( + "context" + "reflect" + "testing" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/resolver" +) + +var ( + rrBuilder = balancer.Get(roundrobin.Name) + testBalancerIDs = []string{"b1", "b2", "b3"} + testBackendAddrs = []resolver.Address{{Addr: "1.1.1.1:1"}, {Addr: "2.2.2.2:2"}, {Addr: "3.3.3.3:3"}, {Addr: "4.4.4.4:4"}} +) + +// 1 balancer, 1 backend -> 2 backends -> 1 backend. +func TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) { + cc := newTestClientConn(t) + bg := newBalancerGroup(cc) + + // Add one balancer to group. + bg.add(testBalancerIDs[0], 1, rrBuilder) + // Send one resolved address. + bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:1]) + + // Send subconn state change. + sc1 := <-cc.newSubConnCh + bg.handleSubConnStateChange(sc1, connectivity.Connecting) + bg.handleSubConnStateChange(sc1, connectivity.Ready) + + // Test pick with one backend. + p1 := <-cc.newPickerCh + for i := 0; i < 5; i++ { + gotSC, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) + if !reflect.DeepEqual(gotSC, sc1) { + t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc1) + } + } + + // Send two addresses. + bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2]) + // Expect one new subconn, send state update. + sc2 := <-cc.newSubConnCh + bg.handleSubConnStateChange(sc2, connectivity.Connecting) + bg.handleSubConnStateChange(sc2, connectivity.Ready) + + // Test roundrobin pick. + p2 := <-cc.newPickerCh + want := []balancer.SubConn{sc1, sc2} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } + + // Remove the first address. + bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[1:2]) + scToRemove := <-cc.removeSubConnCh + if !reflect.DeepEqual(scToRemove, sc1) { + t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove) + } + bg.handleSubConnStateChange(scToRemove, connectivity.Shutdown) + + // Test pick with only the second subconn. + p3 := <-cc.newPickerCh + for i := 0; i < 5; i++ { + gotSC, _, _ := p3.Pick(context.Background(), balancer.PickOptions{}) + if !reflect.DeepEqual(gotSC, sc2) { + t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc2) + } + } +} + +// 2 balancers, each with 1 backend. +func TestBalancerGroup_TwoRR_OneBackend(t *testing.T) { + cc := newTestClientConn(t) + bg := newBalancerGroup(cc) + + // Add two balancers to group and send one resolved address to both + // balancers. + bg.add(testBalancerIDs[0], 1, rrBuilder) + bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:1]) + sc1 := <-cc.newSubConnCh + + bg.add(testBalancerIDs[1], 1, rrBuilder) + bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[0:1]) + sc2 := <-cc.newSubConnCh + + // Send state changes for both subconns. + bg.handleSubConnStateChange(sc1, connectivity.Connecting) + bg.handleSubConnStateChange(sc1, connectivity.Ready) + bg.handleSubConnStateChange(sc2, connectivity.Connecting) + bg.handleSubConnStateChange(sc2, connectivity.Ready) + + // Test roundrobin on the last picker. + p1 := <-cc.newPickerCh + want := []balancer.SubConn{sc1, sc2} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } +} + +// 2 balancers, each with more than 1 backends. +func TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) { + cc := newTestClientConn(t) + bg := newBalancerGroup(cc) + + // Add two balancers to group and send one resolved address to both + // balancers. + bg.add(testBalancerIDs[0], 1, rrBuilder) + bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2]) + sc1 := <-cc.newSubConnCh + sc2 := <-cc.newSubConnCh + + bg.add(testBalancerIDs[1], 1, rrBuilder) + bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4]) + sc3 := <-cc.newSubConnCh + sc4 := <-cc.newSubConnCh + + // Send state changes for both subconns. + bg.handleSubConnStateChange(sc1, connectivity.Connecting) + bg.handleSubConnStateChange(sc1, connectivity.Ready) + bg.handleSubConnStateChange(sc2, connectivity.Connecting) + bg.handleSubConnStateChange(sc2, connectivity.Ready) + bg.handleSubConnStateChange(sc3, connectivity.Connecting) + bg.handleSubConnStateChange(sc3, connectivity.Ready) + bg.handleSubConnStateChange(sc4, connectivity.Connecting) + bg.handleSubConnStateChange(sc4, connectivity.Ready) + + // Test roundrobin on the last picker. + p1 := <-cc.newPickerCh + want := []balancer.SubConn{sc1, sc2, sc3, sc4} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } + + // Turn sc2's connection down, should be RR between balancers. + bg.handleSubConnStateChange(sc2, connectivity.TransientFailure) + p2 := <-cc.newPickerCh + // Expect two sc1's in the result, because balancer1 will be picked twice, + // but there's only one sc in it. + want = []balancer.SubConn{sc1, sc1, sc3, sc4} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } + + // Remove sc3's addresses. + bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[3:4]) + scToRemove := <-cc.removeSubConnCh + if !reflect.DeepEqual(scToRemove, sc3) { + t.Fatalf("RemoveSubConn, want %v, got %v", sc3, scToRemove) + } + bg.handleSubConnStateChange(scToRemove, connectivity.Shutdown) + p3 := <-cc.newPickerCh + want = []balancer.SubConn{sc1, sc4} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p3.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } + + // Turn sc1's connection down. + bg.handleSubConnStateChange(sc1, connectivity.TransientFailure) + p4 := <-cc.newPickerCh + want = []balancer.SubConn{sc4} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p4.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } + + // Turn last connection to connecting. + bg.handleSubConnStateChange(sc4, connectivity.Connecting) + p5 := <-cc.newPickerCh + for i := 0; i < 5; i++ { + if _, _, err := p5.Pick(context.Background(), balancer.PickOptions{}); err != balancer.ErrNoSubConnAvailable { + t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err) + } + } + + // Turn all connections down. + bg.handleSubConnStateChange(sc4, connectivity.TransientFailure) + p6 := <-cc.newPickerCh + for i := 0; i < 5; i++ { + if _, _, err := p6.Pick(context.Background(), balancer.PickOptions{}); err != balancer.ErrTransientFailure { + t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err) + } + } +} + +// 2 balancers with different weights. +func TestBalancerGroup_TwoRR_DifferentWeight_MoreBackends(t *testing.T) { + cc := newTestClientConn(t) + bg := newBalancerGroup(cc) + + // Add two balancers to group and send two resolved addresses to both + // balancers. + bg.add(testBalancerIDs[0], 2, rrBuilder) + bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2]) + sc1 := <-cc.newSubConnCh + sc2 := <-cc.newSubConnCh + + bg.add(testBalancerIDs[1], 1, rrBuilder) + bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4]) + sc3 := <-cc.newSubConnCh + sc4 := <-cc.newSubConnCh + + // Send state changes for both subconns. + bg.handleSubConnStateChange(sc1, connectivity.Connecting) + bg.handleSubConnStateChange(sc1, connectivity.Ready) + bg.handleSubConnStateChange(sc2, connectivity.Connecting) + bg.handleSubConnStateChange(sc2, connectivity.Ready) + bg.handleSubConnStateChange(sc3, connectivity.Connecting) + bg.handleSubConnStateChange(sc3, connectivity.Ready) + bg.handleSubConnStateChange(sc4, connectivity.Connecting) + bg.handleSubConnStateChange(sc4, connectivity.Ready) + + // Test roundrobin on the last picker. + p1 := <-cc.newPickerCh + want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } +} + +// totally 3 balancers, add/remove balancer. +func TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) { + cc := newTestClientConn(t) + bg := newBalancerGroup(cc) + + // Add three balancers to group and send one resolved address to both + // balancers. + bg.add(testBalancerIDs[0], 1, rrBuilder) + bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:1]) + sc1 := <-cc.newSubConnCh + + bg.add(testBalancerIDs[1], 1, rrBuilder) + bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[1:2]) + sc2 := <-cc.newSubConnCh + + bg.add(testBalancerIDs[2], 1, rrBuilder) + bg.handleResolvedAddrs(testBalancerIDs[2], testBackendAddrs[1:2]) + sc3 := <-cc.newSubConnCh + + // Send state changes for both subconns. + bg.handleSubConnStateChange(sc1, connectivity.Connecting) + bg.handleSubConnStateChange(sc1, connectivity.Ready) + bg.handleSubConnStateChange(sc2, connectivity.Connecting) + bg.handleSubConnStateChange(sc2, connectivity.Ready) + bg.handleSubConnStateChange(sc3, connectivity.Connecting) + bg.handleSubConnStateChange(sc3, connectivity.Ready) + + p1 := <-cc.newPickerCh + want := []balancer.SubConn{sc1, sc2, sc3} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } + + // Remove the second balancer, while the others two are ready. + bg.remove(testBalancerIDs[1]) + scToRemove := <-cc.removeSubConnCh + if !reflect.DeepEqual(scToRemove, sc2) { + t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove) + } + p2 := <-cc.newPickerCh + want = []balancer.SubConn{sc1, sc3} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } + + // move balancer 3 into transient failure. + bg.handleSubConnStateChange(sc3, connectivity.TransientFailure) + // Remove the first balancer, while the third is transient failure. + bg.remove(testBalancerIDs[0]) + scToRemove = <-cc.removeSubConnCh + if !reflect.DeepEqual(scToRemove, sc1) { + t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove) + } + p3 := <-cc.newPickerCh + for i := 0; i < 5; i++ { + if _, _, err := p3.Pick(context.Background(), balancer.PickOptions{}); err != balancer.ErrTransientFailure { + t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err) + } + } +} + +// 2 balancers, change balancer weight. +func TestBalancerGroup_TwoRR_ChangeWeight_MoreBackends(t *testing.T) { + cc := newTestClientConn(t) + bg := newBalancerGroup(cc) + + // Add two balancers to group and send two resolved addresses to both + // balancers. + bg.add(testBalancerIDs[0], 2, rrBuilder) + bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2]) + sc1 := <-cc.newSubConnCh + sc2 := <-cc.newSubConnCh + + bg.add(testBalancerIDs[1], 1, rrBuilder) + bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4]) + sc3 := <-cc.newSubConnCh + sc4 := <-cc.newSubConnCh + + // Send state changes for both subconns. + bg.handleSubConnStateChange(sc1, connectivity.Connecting) + bg.handleSubConnStateChange(sc1, connectivity.Ready) + bg.handleSubConnStateChange(sc2, connectivity.Connecting) + bg.handleSubConnStateChange(sc2, connectivity.Ready) + bg.handleSubConnStateChange(sc3, connectivity.Connecting) + bg.handleSubConnStateChange(sc3, connectivity.Ready) + bg.handleSubConnStateChange(sc4, connectivity.Connecting) + bg.handleSubConnStateChange(sc4, connectivity.Ready) + + // Test roundrobin on the last picker. + p1 := <-cc.newPickerCh + want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } + + bg.changeWeight(testBalancerIDs[0], 3) + + // Test roundrobin with new weight. + p2 := <-cc.newPickerCh + want = []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc2, sc3, sc4} + if err := isRoundRobin(want, func() balancer.SubConn { + sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{}) + return sc + }); err != nil { + t.Fatalf("want %v, got %v", want, err) + } +} diff --git a/balancer/xds/edsbalancer/test_util_test.go b/balancer/xds/edsbalancer/test_util_test.go new file mode 100644 index 00000000..ed0415d4 --- /dev/null +++ b/balancer/xds/edsbalancer/test_util_test.go @@ -0,0 +1,298 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package edsbalancer + +import ( + "fmt" + "testing" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/resolver" +) + +var ( + testSubConns = []*testSubConn{{id: "sc1"}, {id: "sc2"}, {id: "sc3"}, {id: "sc4"}} +) + +type testSubConn struct { + id string +} + +func (tsc *testSubConn) UpdateAddresses([]resolver.Address) { + panic("not implemented") +} + +func (tsc *testSubConn) Connect() { +} + +// Implement stringer to get human friendly error message. +func (tsc *testSubConn) String() string { + return tsc.id +} + +type testClientConn struct { + t *testing.T // For logging only. + + newSubConnAddrsCh chan []resolver.Address // The last 10 []Address to create subconn. + newSubConnCh chan balancer.SubConn // The last 10 subconn created. + removeSubConnCh chan balancer.SubConn // The last 10 subconn removed. + + newPickerCh chan balancer.Picker // The last picker updated. + newStateCh chan connectivity.State // The last state. + + subConnIdx int +} + +func newTestClientConn(t *testing.T) *testClientConn { + return &testClientConn{ + t: t, + + newSubConnAddrsCh: make(chan []resolver.Address, 10), + newSubConnCh: make(chan balancer.SubConn, 10), + removeSubConnCh: make(chan balancer.SubConn, 10), + + newPickerCh: make(chan balancer.Picker, 1), + newStateCh: make(chan connectivity.State, 1), + } +} + +func (tcc *testClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubConnOptions) (balancer.SubConn, error) { + sc := testSubConns[tcc.subConnIdx] + tcc.subConnIdx++ + + tcc.t.Logf("testClientConn: NewSubConn(%v, %+v) => %p", a, o, sc) + select { + case tcc.newSubConnAddrsCh <- a: + default: + } + + select { + case tcc.newSubConnCh <- sc: + default: + } + + return sc, nil +} + +func (tcc *testClientConn) RemoveSubConn(sc balancer.SubConn) { + tcc.t.Logf("testClientCOnn: RemoveSubConn(%p)", sc) + select { + case tcc.removeSubConnCh <- sc: + default: + } +} + +func (tcc *testClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) { + tcc.t.Logf("testClientConn: UpdateBalancerState(%v, %p)", s, p) + select { + case <-tcc.newStateCh: + default: + } + tcc.newStateCh <- s + + select { + case <-tcc.newPickerCh: + default: + } + tcc.newPickerCh <- p +} + +func (tcc *testClientConn) ResolveNow(resolver.ResolveNowOption) { + panic("not implemented") +} + +func (tcc *testClientConn) Target() string { + panic("not implemented") +} + +// isRoundRobin checks whether f's return value is roundrobin of elements from +// want. But it doesn't check for the order. Note that want can contain +// duplicate items, which makes it weight-round-robin. +// +// Step 1. the return values of f should form a permutation of all elements in +// want, but not necessary in the same order. E.g. if want is {a,a,b}, the check +// fails if f returns: +// - {a,a,a}: third a is returned before b +// - {a,b,b}: second b is returned before the second a +// +// If error is found in this step, the returned error contains only the first +// iteration until where it goes wrong. +// +// Step 2. the return values of f should be repetitions of the same permutation. +// E.g. if want is {a,a,b}, the check failes if f returns: +// - {a,b,a,b,a,a}: though it satisfies step 1, the second iteration is not +// repeating the first iteration. +// +// If error is found in this step, the returned error contains the first +// iteration + the second iteration until where it goes wrong. +func isRoundRobin(want []balancer.SubConn, f func() balancer.SubConn) error { + wantSet := make(map[balancer.SubConn]int) // SubConn -> count, for weighted RR. + for _, sc := range want { + wantSet[sc]++ + } + + // The first iteration: makes sure f's return values form a permutation of + // elements in want. + // + // Also keep the returns values in a slice, so we can compare the order in + // the second iteration. + gotSliceFirstIteration := make([]balancer.SubConn, 0, len(want)) + for range want { + got := f() + gotSliceFirstIteration = append(gotSliceFirstIteration, got) + wantSet[got]-- + if wantSet[got] < 0 { + return fmt.Errorf("non-roundrobin want: %v, result: %v", want, gotSliceFirstIteration) + } + } + + // The second iteration should repeat the first iteration. + var gotSliceSecondIteration []balancer.SubConn + for i := 0; i < 2; i++ { + for _, w := range gotSliceFirstIteration { + g := f() + gotSliceSecondIteration = append(gotSliceSecondIteration, g) + if w != g { + return fmt.Errorf("non-roundrobin, first iter: %v, second iter: %v", gotSliceFirstIteration, gotSliceSecondIteration) + } + } + } + + return nil +} + +// testClosure is a test util for TestIsRoundRobin. +type testClosure struct { + r []balancer.SubConn + i int +} + +func (tc *testClosure) next() balancer.SubConn { + ret := tc.r[tc.i] + tc.i = (tc.i + 1) % len(tc.r) + return ret +} + +func TestIsRoundRobin(t *testing.T) { + var ( + sc1 = testSubConns[0] + sc2 = testSubConns[1] + sc3 = testSubConns[2] + ) + + testCases := []struct { + desc string + want []balancer.SubConn + got []balancer.SubConn + pass bool + }{ + { + desc: "0 element", + want: []balancer.SubConn{}, + got: []balancer.SubConn{}, + pass: true, + }, + { + desc: "1 element RR", + want: []balancer.SubConn{sc1}, + got: []balancer.SubConn{sc1, sc1, sc1, sc1}, + pass: true, + }, + { + desc: "1 element not RR", + want: []balancer.SubConn{sc1}, + got: []balancer.SubConn{sc1, sc2, sc1}, + pass: false, + }, + { + desc: "2 elements RR", + want: []balancer.SubConn{sc1, sc2}, + got: []balancer.SubConn{sc1, sc2, sc1, sc2, sc1, sc2}, + pass: true, + }, + { + desc: "2 elements RR different order from want", + want: []balancer.SubConn{sc2, sc1}, + got: []balancer.SubConn{sc1, sc2, sc1, sc2, sc1, sc2}, + pass: true, + }, + { + desc: "2 elements RR not RR, mistake in first iter", + want: []balancer.SubConn{sc1, sc2}, + got: []balancer.SubConn{sc1, sc1, sc1, sc2, sc1, sc2}, + pass: false, + }, + { + desc: "2 elements RR not RR, mistake in second iter", + want: []balancer.SubConn{sc1, sc2}, + got: []balancer.SubConn{sc1, sc2, sc1, sc1, sc1, sc2}, + pass: false, + }, + { + desc: "2 elements weighted RR", + want: []balancer.SubConn{sc1, sc1, sc2}, + got: []balancer.SubConn{sc1, sc1, sc2, sc1, sc1, sc2}, + pass: true, + }, + { + desc: "2 elements weighted RR different order", + want: []balancer.SubConn{sc1, sc1, sc2}, + got: []balancer.SubConn{sc1, sc2, sc1, sc1, sc2, sc1}, + pass: true, + }, + + { + desc: "3 elements RR", + want: []balancer.SubConn{sc1, sc2, sc3}, + got: []balancer.SubConn{sc1, sc2, sc3, sc1, sc2, sc3, sc1, sc2, sc3}, + pass: true, + }, + { + desc: "3 elements RR different order", + want: []balancer.SubConn{sc1, sc2, sc3}, + got: []balancer.SubConn{sc3, sc2, sc1, sc3, sc2, sc1}, + pass: true, + }, + { + desc: "3 elements weighted RR", + want: []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc3}, + got: []balancer.SubConn{sc1, sc2, sc3, sc1, sc2, sc1, sc1, sc2, sc3, sc1, sc2, sc1}, + pass: true, + }, + { + desc: "3 elements weighted RR not RR, mistake in first iter", + want: []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc3}, + got: []balancer.SubConn{sc1, sc2, sc1, sc1, sc2, sc1, sc1, sc2, sc3, sc1, sc2, sc1}, + pass: false, + }, + { + desc: "3 elements weighted RR not RR, mistake in second iter", + want: []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc3}, + got: []balancer.SubConn{sc1, sc2, sc3, sc1, sc2, sc1, sc1, sc1, sc3, sc1, sc2, sc1}, + pass: false, + }, + } + for _, tC := range testCases { + t.Run(tC.desc, func(t *testing.T) { + err := isRoundRobin(tC.want, (&testClosure{r: tC.got}).next) + if err == nil != tC.pass { + t.Errorf("want pass %v, want %v, got err %v", tC.pass, tC.want, err) + } + }) + } +} diff --git a/balancer/xds/xds.go b/balancer/xds/xds.go new file mode 100644 index 00000000..2bd77d48 --- /dev/null +++ b/balancer/xds/xds.go @@ -0,0 +1,18 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package xds implements xds balancer. +package xds // import "google.golang.org/grpc/balancer/xds"