xds: move balancer group to a separate package (#3493)

This PR moves balancer group to a separate package and exports the type with some methods. Balancer group will be used by the weighted_target policy.
This commit is contained in:
Menghan Li
2020-04-06 15:12:58 -07:00
committed by GitHub
parent c5faf568da
commit 98e4c7ad3e
13 changed files with 1077 additions and 951 deletions

View File

@ -14,7 +14,9 @@
* limitations under the License.
*/
package edsbalancer
// Package balancergroup implements a utility struct to bind multiple balancers
// into one balancer.
package balancergroup
import (
"fmt"
@ -52,7 +54,7 @@ type subBalancerWithConfig struct {
// Some are forward to balancer group with the sub-balancer ID.
balancer.ClientConn
id internal.Locality
group *balancerGroup
group *BalancerGroup
mu sync.Mutex
state balancer.State
@ -154,7 +156,7 @@ func (s *pickerState) String() string {
return fmt.Sprintf("weight:%v,picker:%p,state:%v", s.weight, s.picker, s.state)
}
// balancerGroup takes a list of balancers, and make then into one balancer.
// BalancerGroup takes a list of balancers, and make them into one balancer.
//
// Note that this struct doesn't implement balancer.Balancer, because it's not
// intended to be used directly as a balancer. It's expected to be used as a
@ -178,7 +180,7 @@ func (s *pickerState) String() string {
// balancer group is closed, the sub-balancers are also closed. And it's
// guaranteed that no updates will be sent to parent ClientConn from a closed
// balancer group.
type balancerGroup struct {
type BalancerGroup struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger
loadStore lrs.Store
@ -226,26 +228,34 @@ type balancerGroup struct {
idToPickerState map[internal.Locality]*pickerState
}
// defaultSubBalancerCloseTimeout is defined as a variable instead of const for
// DefaultSubBalancerCloseTimeout is defined as a variable instead of const for
// testing.
//
// TODO: make it a parameter for newBalancerGroup().
var defaultSubBalancerCloseTimeout = 15 * time.Minute
// TODO: make it a parameter for New().
var DefaultSubBalancerCloseTimeout = 15 * time.Minute
func newBalancerGroup(cc balancer.ClientConn, loadStore lrs.Store, logger *grpclog.PrefixLogger) *balancerGroup {
return &balancerGroup{
// New creates a new BalancerGroup. Note that the BalancerGroup
// needs to be started to work.
func New(cc balancer.ClientConn, loadStore lrs.Store, logger *grpclog.PrefixLogger) *BalancerGroup {
return &BalancerGroup{
cc: cc,
logger: logger,
loadStore: loadStore,
idToBalancerConfig: make(map[internal.Locality]*subBalancerWithConfig),
balancerCache: cache.NewTimeoutCache(defaultSubBalancerCloseTimeout),
balancerCache: cache.NewTimeoutCache(DefaultSubBalancerCloseTimeout),
scToSubBalancer: make(map[balancer.SubConn]*subBalancerWithConfig),
idToPickerState: make(map[internal.Locality]*pickerState),
}
}
func (bg *balancerGroup) start() {
// Start starts the balancer group, including building all the sub-balancers,
// and send the existing addresses to them.
//
// A BalancerGroup can be closed and started later. When a BalancerGroup is
// closed, it can still receive address updates, which will be applied when
// restarted.
func (bg *BalancerGroup) Start() {
bg.incomingMu.Lock()
bg.incomingStarted = true
bg.incomingMu.Unlock()
@ -263,12 +273,12 @@ func (bg *balancerGroup) start() {
bg.outgoingMu.Unlock()
}
// add adds a balancer built by builder to the group, with given id and weight.
// Add adds a balancer built by builder to the group, with given id and weight.
//
// weight should never be zero.
func (bg *balancerGroup) add(id internal.Locality, weight uint32, builder balancer.Builder) {
func (bg *BalancerGroup) Add(id internal.Locality, weight uint32, builder balancer.Builder) {
if weight == 0 {
bg.logger.Errorf("balancerGroup.add called with weight 0, locality: %v. Locality is not added to balancer group", id)
bg.logger.Errorf("BalancerGroup.add called with weight 0, locality: %v. Locality is not added to balancer group", id)
return
}
@ -329,7 +339,7 @@ func (bg *balancerGroup) add(id internal.Locality, weight uint32, builder balanc
bg.outgoingMu.Unlock()
}
// remove removes the balancer with id from the group.
// Remove removes the balancer with id from the group.
//
// But doesn't close the balancer. The balancer is kept in a cache, and will be
// closed after timeout. Cleanup work (closing sub-balancer and removing
@ -337,7 +347,7 @@ func (bg *balancerGroup) add(id internal.Locality, weight uint32, builder balanc
//
// It also removes the picker generated from this balancer from the picker
// group. It always results in a picker update.
func (bg *balancerGroup) remove(id internal.Locality) {
func (bg *BalancerGroup) Remove(id internal.Locality) {
bg.outgoingMu.Lock()
if sbToRemove, ok := bg.idToBalancerConfig[id]; ok {
if bg.outgoingStarted {
@ -374,7 +384,7 @@ func (bg *balancerGroup) remove(id internal.Locality) {
// bg.remove(id) doesn't do cleanup for the sub-balancer. This function does
// cleanup after the timeout.
func (bg *balancerGroup) cleanupSubConns(config *subBalancerWithConfig) {
func (bg *BalancerGroup) cleanupSubConns(config *subBalancerWithConfig) {
bg.incomingMu.Lock()
// Remove SubConns. This is only done after the balancer is
// actually closed.
@ -393,16 +403,16 @@ func (bg *balancerGroup) cleanupSubConns(config *subBalancerWithConfig) {
bg.incomingMu.Unlock()
}
// changeWeight changes the weight of the balancer.
// ChangeWeight changes the weight of the balancer.
//
// newWeight should never be zero.
//
// NOTE: It always results in a picker update now. This probably isn't
// necessary. But it seems better to do the update because it's a change in the
// picker (which is balancer's snapshot).
func (bg *balancerGroup) changeWeight(id internal.Locality, newWeight uint32) {
func (bg *BalancerGroup) ChangeWeight(id internal.Locality, newWeight uint32) {
if newWeight == 0 {
bg.logger.Errorf("balancerGroup.changeWeight called with newWeight 0. Weight is not changed")
bg.logger.Errorf("BalancerGroup.changeWeight called with newWeight 0. Weight is not changed")
return
}
bg.incomingMu.Lock()
@ -425,8 +435,9 @@ func (bg *balancerGroup) changeWeight(id internal.Locality, newWeight uint32) {
// Following are actions from the parent grpc.ClientConn, forward to sub-balancers.
// SubConn state change: find the corresponding balancer and then forward.
func (bg *balancerGroup) handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
// HandleSubConnStateChange handles the state for the subconn. It finds the
// corresponding balancer and forwards the update.
func (bg *BalancerGroup) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
bg.incomingMu.Lock()
config, ok := bg.scToSubBalancer[sc]
if !ok {
@ -444,8 +455,12 @@ func (bg *balancerGroup) handleSubConnStateChange(sc balancer.SubConn, state con
bg.outgoingMu.Unlock()
}
// Address change: forward to balancer.
func (bg *balancerGroup) handleResolvedAddrs(id internal.Locality, addrs []resolver.Address) {
// HandleResolvedAddrs handles addresses from resolver. It finds the balancer
// and forwards the update.
//
// TODO: change this to UpdateClientConnState to handle addresses and balancer
// config.
func (bg *BalancerGroup) HandleResolvedAddrs(id internal.Locality, addrs []resolver.Address) {
bg.outgoingMu.Lock()
if config, ok := bg.idToBalancerConfig[id]; ok {
config.updateAddrs(addrs)
@ -467,7 +482,7 @@ func (bg *balancerGroup) handleResolvedAddrs(id internal.Locality, addrs []resol
// from map. Delete sc from the map only when state changes to Shutdown. Since
// it's just forwarding the action, there's no need for a removeSubConn()
// wrapper function.
func (bg *balancerGroup) newSubConn(config *subBalancerWithConfig, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
func (bg *BalancerGroup) newSubConn(config *subBalancerWithConfig, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
// NOTE: if balancer with id was already removed, this should also return
// error. But since we call balancer.stopBalancer when removing the balancer, this
// shouldn't happen.
@ -488,7 +503,7 @@ func (bg *balancerGroup) newSubConn(config *subBalancerWithConfig, addrs []resol
// updateBalancerState: create an aggregated picker and an aggregated
// connectivity state, then forward to ClientConn.
func (bg *balancerGroup) updateBalancerState(id internal.Locality, state balancer.State) {
func (bg *BalancerGroup) updateBalancerState(id internal.Locality, state balancer.State) {
bg.logger.Infof("Balancer state update from locality %v, new state: %+v", id, state)
bg.incomingMu.Lock()
@ -508,7 +523,9 @@ func (bg *balancerGroup) updateBalancerState(id internal.Locality, state balance
}
}
func (bg *balancerGroup) close() {
// Close closes the balancer. It stops sub-balancers, and removes the subconns.
// The BalancerGroup can be restarted later.
func (bg *BalancerGroup) Close() {
bg.incomingMu.Lock()
if bg.incomingStarted {
bg.incomingStarted = false
@ -568,8 +585,9 @@ func buildPickerAndState(m map[internal.Locality]*pickerState) balancer.State {
return balancer.State{ConnectivityState: aggregatedState, Picker: newPickerGroup(readyPickerWithWeights)}
}
// RandomWRR constructor, to be modified in tests.
var newRandomWRR = wrr.NewRandom
// NewRandomWRR is the WRR constructor used to pick sub-pickers from
// sub-balancers. It's to be modified in tests.
var NewRandomWRR = wrr.NewRandom
type pickerGroup struct {
length int
@ -584,7 +602,7 @@ type pickerGroup struct {
// TODO: (bg) confirm this is the expected behavior: non-ready balancers should
// be ignored when picking. Only ready balancers are picked.
func newPickerGroup(readyPickerWithWeights []pickerState) *pickerGroup {
w := newRandomWRR()
w := NewRandomWRR()
for _, ps := range readyPickerWithWeights {
w.Add(ps.picker, int64(ps.weight))
}

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package edsbalancer
package balancergroup
import (
"fmt"
@ -28,6 +28,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils"
)
var (
@ -45,7 +46,7 @@ func init() {
// Disable caching for all tests. It will be re-enabled in caching specific
// tests.
defaultSubBalancerCloseTimeout = time.Millisecond
DefaultSubBalancerCloseTimeout = time.Millisecond
}
func subConnFromPicker(p balancer.V2Picker) func() balancer.SubConn {
@ -57,56 +58,56 @@ func subConnFromPicker(p balancer.V2Picker) func() balancer.SubConn {
// 1 balancer, 1 backend -> 2 backends -> 1 backend.
func (s) TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) {
cc := newTestClientConn(t)
bg := newBalancerGroup(cc, nil, nil)
bg.start()
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
bg.Start()
// Add one balancer to group.
bg.add(testBalancerIDs[0], 1, rrBuilder)
bg.Add(testBalancerIDs[0], 1, rrBuilder)
// Send one resolved address.
bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:1])
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:1])
// Send subconn state change.
sc1 := <-cc.newSubConnCh
bg.handleSubConnStateChange(sc1, connectivity.Connecting)
bg.handleSubConnStateChange(sc1, connectivity.Ready)
sc1 := <-cc.NewSubConnCh
bg.HandleSubConnStateChange(sc1, connectivity.Connecting)
bg.HandleSubConnStateChange(sc1, connectivity.Ready)
// Test pick with one backend.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testSubConn{})) {
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
}
// Send two addresses.
bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
// Expect one new subconn, send state update.
sc2 := <-cc.newSubConnCh
bg.handleSubConnStateChange(sc2, connectivity.Connecting)
bg.handleSubConnStateChange(sc2, connectivity.Ready)
sc2 := <-cc.NewSubConnCh
bg.HandleSubConnStateChange(sc2, connectivity.Connecting)
bg.HandleSubConnStateChange(sc2, connectivity.Ready)
// Test roundrobin pick.
p2 := <-cc.newPickerCh
p2 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc2}
if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Remove the first address.
bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[1:2])
scToRemove := <-cc.removeSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testSubConn{})) {
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[1:2])
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
}
bg.handleSubConnStateChange(scToRemove, connectivity.Shutdown)
bg.HandleSubConnStateChange(scToRemove, connectivity.Shutdown)
// Test pick with only the second subconn.
p3 := <-cc.newPickerCh
p3 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSC, _ := p3.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSC.SubConn, sc2, cmp.AllowUnexported(testSubConn{})) {
if !cmp.Equal(gotSC.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSC, sc2)
}
}
@ -114,103 +115,103 @@ func (s) TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) {
// 2 balancers, each with 1 backend.
func (s) TestBalancerGroup_TwoRR_OneBackend(t *testing.T) {
cc := newTestClientConn(t)
bg := newBalancerGroup(cc, nil, nil)
bg.start()
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
bg.Start()
// Add two balancers to group and send one resolved address to both
// balancers.
bg.add(testBalancerIDs[0], 1, rrBuilder)
bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:1])
sc1 := <-cc.newSubConnCh
bg.Add(testBalancerIDs[0], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:1])
sc1 := <-cc.NewSubConnCh
bg.add(testBalancerIDs[1], 1, rrBuilder)
bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[0:1])
sc2 := <-cc.newSubConnCh
bg.Add(testBalancerIDs[1], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[0:1])
sc2 := <-cc.NewSubConnCh
// Send state changes for both subconns.
bg.handleSubConnStateChange(sc1, connectivity.Connecting)
bg.handleSubConnStateChange(sc1, connectivity.Ready)
bg.handleSubConnStateChange(sc2, connectivity.Connecting)
bg.handleSubConnStateChange(sc2, connectivity.Ready)
bg.HandleSubConnStateChange(sc1, connectivity.Connecting)
bg.HandleSubConnStateChange(sc1, connectivity.Ready)
bg.HandleSubConnStateChange(sc2, connectivity.Connecting)
bg.HandleSubConnStateChange(sc2, connectivity.Ready)
// Test roundrobin on the last picker.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc2}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
// 2 balancers, each with more than 1 backends.
func (s) TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) {
cc := newTestClientConn(t)
bg := newBalancerGroup(cc, nil, nil)
bg.start()
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
bg.Start()
// Add two balancers to group and send one resolved address to both
// balancers.
bg.add(testBalancerIDs[0], 1, rrBuilder)
bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
sc1 := <-cc.newSubConnCh
sc2 := <-cc.newSubConnCh
bg.Add(testBalancerIDs[0], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
sc1 := <-cc.NewSubConnCh
sc2 := <-cc.NewSubConnCh
bg.add(testBalancerIDs[1], 1, rrBuilder)
bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
sc3 := <-cc.newSubConnCh
sc4 := <-cc.newSubConnCh
bg.Add(testBalancerIDs[1], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
sc3 := <-cc.NewSubConnCh
sc4 := <-cc.NewSubConnCh
// Send state changes for both subconns.
bg.handleSubConnStateChange(sc1, connectivity.Connecting)
bg.handleSubConnStateChange(sc1, connectivity.Ready)
bg.handleSubConnStateChange(sc2, connectivity.Connecting)
bg.handleSubConnStateChange(sc2, connectivity.Ready)
bg.handleSubConnStateChange(sc3, connectivity.Connecting)
bg.handleSubConnStateChange(sc3, connectivity.Ready)
bg.handleSubConnStateChange(sc4, connectivity.Connecting)
bg.handleSubConnStateChange(sc4, connectivity.Ready)
bg.HandleSubConnStateChange(sc1, connectivity.Connecting)
bg.HandleSubConnStateChange(sc1, connectivity.Ready)
bg.HandleSubConnStateChange(sc2, connectivity.Connecting)
bg.HandleSubConnStateChange(sc2, connectivity.Ready)
bg.HandleSubConnStateChange(sc3, connectivity.Connecting)
bg.HandleSubConnStateChange(sc3, connectivity.Ready)
bg.HandleSubConnStateChange(sc4, connectivity.Connecting)
bg.HandleSubConnStateChange(sc4, connectivity.Ready)
// Test roundrobin on the last picker.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc2, sc3, sc4}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Turn sc2's connection down, should be RR between balancers.
bg.handleSubConnStateChange(sc2, connectivity.TransientFailure)
p2 := <-cc.newPickerCh
bg.HandleSubConnStateChange(sc2, connectivity.TransientFailure)
p2 := <-cc.NewPickerCh
// Expect two sc1's in the result, because balancer1 will be picked twice,
// but there's only one sc in it.
want = []balancer.SubConn{sc1, sc1, sc3, sc4}
if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Remove sc3's addresses.
bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[3:4])
scToRemove := <-cc.removeSubConnCh
if !cmp.Equal(scToRemove, sc3, cmp.AllowUnexported(testSubConn{})) {
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[3:4])
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc3, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc3, scToRemove)
}
bg.handleSubConnStateChange(scToRemove, connectivity.Shutdown)
p3 := <-cc.newPickerCh
bg.HandleSubConnStateChange(scToRemove, connectivity.Shutdown)
p3 := <-cc.NewPickerCh
want = []balancer.SubConn{sc1, sc4}
if err := isRoundRobin(want, subConnFromPicker(p3)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p3)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Turn sc1's connection down.
bg.handleSubConnStateChange(sc1, connectivity.TransientFailure)
p4 := <-cc.newPickerCh
bg.HandleSubConnStateChange(sc1, connectivity.TransientFailure)
p4 := <-cc.NewPickerCh
want = []balancer.SubConn{sc4}
if err := isRoundRobin(want, subConnFromPicker(p4)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p4)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Turn last connection to connecting.
bg.handleSubConnStateChange(sc4, connectivity.Connecting)
p5 := <-cc.newPickerCh
bg.HandleSubConnStateChange(sc4, connectivity.Connecting)
p5 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := p5.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable {
t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err)
@ -218,8 +219,8 @@ func (s) TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) {
}
// Turn all connections down.
bg.handleSubConnStateChange(sc4, connectivity.TransientFailure)
p6 := <-cc.newPickerCh
bg.HandleSubConnStateChange(sc4, connectivity.TransientFailure)
p6 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := p6.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure {
t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err)
@ -229,95 +230,95 @@ func (s) TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) {
// 2 balancers with different weights.
func (s) TestBalancerGroup_TwoRR_DifferentWeight_MoreBackends(t *testing.T) {
cc := newTestClientConn(t)
bg := newBalancerGroup(cc, nil, nil)
bg.start()
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
bg.Start()
// Add two balancers to group and send two resolved addresses to both
// balancers.
bg.add(testBalancerIDs[0], 2, rrBuilder)
bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
sc1 := <-cc.newSubConnCh
sc2 := <-cc.newSubConnCh
bg.Add(testBalancerIDs[0], 2, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
sc1 := <-cc.NewSubConnCh
sc2 := <-cc.NewSubConnCh
bg.add(testBalancerIDs[1], 1, rrBuilder)
bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
sc3 := <-cc.newSubConnCh
sc4 := <-cc.newSubConnCh
bg.Add(testBalancerIDs[1], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
sc3 := <-cc.NewSubConnCh
sc4 := <-cc.NewSubConnCh
// Send state changes for both subconns.
bg.handleSubConnStateChange(sc1, connectivity.Connecting)
bg.handleSubConnStateChange(sc1, connectivity.Ready)
bg.handleSubConnStateChange(sc2, connectivity.Connecting)
bg.handleSubConnStateChange(sc2, connectivity.Ready)
bg.handleSubConnStateChange(sc3, connectivity.Connecting)
bg.handleSubConnStateChange(sc3, connectivity.Ready)
bg.handleSubConnStateChange(sc4, connectivity.Connecting)
bg.handleSubConnStateChange(sc4, connectivity.Ready)
bg.HandleSubConnStateChange(sc1, connectivity.Connecting)
bg.HandleSubConnStateChange(sc1, connectivity.Ready)
bg.HandleSubConnStateChange(sc2, connectivity.Connecting)
bg.HandleSubConnStateChange(sc2, connectivity.Ready)
bg.HandleSubConnStateChange(sc3, connectivity.Connecting)
bg.HandleSubConnStateChange(sc3, connectivity.Ready)
bg.HandleSubConnStateChange(sc4, connectivity.Connecting)
bg.HandleSubConnStateChange(sc4, connectivity.Ready)
// Test roundrobin on the last picker.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
// totally 3 balancers, add/remove balancer.
func (s) TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) {
cc := newTestClientConn(t)
bg := newBalancerGroup(cc, nil, nil)
bg.start()
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
bg.Start()
// Add three balancers to group and send one resolved address to both
// balancers.
bg.add(testBalancerIDs[0], 1, rrBuilder)
bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:1])
sc1 := <-cc.newSubConnCh
bg.Add(testBalancerIDs[0], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:1])
sc1 := <-cc.NewSubConnCh
bg.add(testBalancerIDs[1], 1, rrBuilder)
bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[1:2])
sc2 := <-cc.newSubConnCh
bg.Add(testBalancerIDs[1], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[1:2])
sc2 := <-cc.NewSubConnCh
bg.add(testBalancerIDs[2], 1, rrBuilder)
bg.handleResolvedAddrs(testBalancerIDs[2], testBackendAddrs[1:2])
sc3 := <-cc.newSubConnCh
bg.Add(testBalancerIDs[2], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[2], testBackendAddrs[1:2])
sc3 := <-cc.NewSubConnCh
// Send state changes for both subconns.
bg.handleSubConnStateChange(sc1, connectivity.Connecting)
bg.handleSubConnStateChange(sc1, connectivity.Ready)
bg.handleSubConnStateChange(sc2, connectivity.Connecting)
bg.handleSubConnStateChange(sc2, connectivity.Ready)
bg.handleSubConnStateChange(sc3, connectivity.Connecting)
bg.handleSubConnStateChange(sc3, connectivity.Ready)
bg.HandleSubConnStateChange(sc1, connectivity.Connecting)
bg.HandleSubConnStateChange(sc1, connectivity.Ready)
bg.HandleSubConnStateChange(sc2, connectivity.Connecting)
bg.HandleSubConnStateChange(sc2, connectivity.Ready)
bg.HandleSubConnStateChange(sc3, connectivity.Connecting)
bg.HandleSubConnStateChange(sc3, connectivity.Ready)
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc2, sc3}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Remove the second balancer, while the others two are ready.
bg.remove(testBalancerIDs[1])
scToRemove := <-cc.removeSubConnCh
if !cmp.Equal(scToRemove, sc2, cmp.AllowUnexported(testSubConn{})) {
bg.Remove(testBalancerIDs[1])
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove)
}
p2 := <-cc.newPickerCh
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{sc1, sc3}
if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// move balancer 3 into transient failure.
bg.handleSubConnStateChange(sc3, connectivity.TransientFailure)
bg.HandleSubConnStateChange(sc3, connectivity.TransientFailure)
// Remove the first balancer, while the third is transient failure.
bg.remove(testBalancerIDs[0])
scToRemove = <-cc.removeSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testSubConn{})) {
bg.Remove(testBalancerIDs[0])
scToRemove = <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
}
p3 := <-cc.newPickerCh
p3 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := p3.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure {
t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err)
@ -327,90 +328,90 @@ func (s) TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) {
// 2 balancers, change balancer weight.
func (s) TestBalancerGroup_TwoRR_ChangeWeight_MoreBackends(t *testing.T) {
cc := newTestClientConn(t)
bg := newBalancerGroup(cc, nil, nil)
bg.start()
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
bg.Start()
// Add two balancers to group and send two resolved addresses to both
// balancers.
bg.add(testBalancerIDs[0], 2, rrBuilder)
bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
sc1 := <-cc.newSubConnCh
sc2 := <-cc.newSubConnCh
bg.Add(testBalancerIDs[0], 2, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
sc1 := <-cc.NewSubConnCh
sc2 := <-cc.NewSubConnCh
bg.add(testBalancerIDs[1], 1, rrBuilder)
bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
sc3 := <-cc.newSubConnCh
sc4 := <-cc.newSubConnCh
bg.Add(testBalancerIDs[1], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
sc3 := <-cc.NewSubConnCh
sc4 := <-cc.NewSubConnCh
// Send state changes for both subconns.
bg.handleSubConnStateChange(sc1, connectivity.Connecting)
bg.handleSubConnStateChange(sc1, connectivity.Ready)
bg.handleSubConnStateChange(sc2, connectivity.Connecting)
bg.handleSubConnStateChange(sc2, connectivity.Ready)
bg.handleSubConnStateChange(sc3, connectivity.Connecting)
bg.handleSubConnStateChange(sc3, connectivity.Ready)
bg.handleSubConnStateChange(sc4, connectivity.Connecting)
bg.handleSubConnStateChange(sc4, connectivity.Ready)
bg.HandleSubConnStateChange(sc1, connectivity.Connecting)
bg.HandleSubConnStateChange(sc1, connectivity.Ready)
bg.HandleSubConnStateChange(sc2, connectivity.Connecting)
bg.HandleSubConnStateChange(sc2, connectivity.Ready)
bg.HandleSubConnStateChange(sc3, connectivity.Connecting)
bg.HandleSubConnStateChange(sc3, connectivity.Ready)
bg.HandleSubConnStateChange(sc4, connectivity.Connecting)
bg.HandleSubConnStateChange(sc4, connectivity.Ready)
// Test roundrobin on the last picker.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
bg.changeWeight(testBalancerIDs[0], 3)
bg.ChangeWeight(testBalancerIDs[0], 3)
// Test roundrobin with new weight.
p2 := <-cc.newPickerCh
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc2, sc3, sc4}
if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
func (s) TestBalancerGroup_LoadReport(t *testing.T) {
testLoadStore := newTestLoadStore()
testLoadStore := testutils.NewTestLoadStore()
cc := newTestClientConn(t)
bg := newBalancerGroup(cc, testLoadStore, nil)
bg.start()
cc := testutils.NewTestClientConn(t)
bg := New(cc, testLoadStore, nil)
bg.Start()
backendToBalancerID := make(map[balancer.SubConn]internal.Locality)
// Add two balancers to group and send two resolved addresses to both
// balancers.
bg.add(testBalancerIDs[0], 2, rrBuilder)
bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
sc1 := <-cc.newSubConnCh
sc2 := <-cc.newSubConnCh
bg.Add(testBalancerIDs[0], 2, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
sc1 := <-cc.NewSubConnCh
sc2 := <-cc.NewSubConnCh
backendToBalancerID[sc1] = testBalancerIDs[0]
backendToBalancerID[sc2] = testBalancerIDs[0]
bg.add(testBalancerIDs[1], 1, rrBuilder)
bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
sc3 := <-cc.newSubConnCh
sc4 := <-cc.newSubConnCh
bg.Add(testBalancerIDs[1], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
sc3 := <-cc.NewSubConnCh
sc4 := <-cc.NewSubConnCh
backendToBalancerID[sc3] = testBalancerIDs[1]
backendToBalancerID[sc4] = testBalancerIDs[1]
// Send state changes for both subconns.
bg.handleSubConnStateChange(sc1, connectivity.Connecting)
bg.handleSubConnStateChange(sc1, connectivity.Ready)
bg.handleSubConnStateChange(sc2, connectivity.Connecting)
bg.handleSubConnStateChange(sc2, connectivity.Ready)
bg.handleSubConnStateChange(sc3, connectivity.Connecting)
bg.handleSubConnStateChange(sc3, connectivity.Ready)
bg.handleSubConnStateChange(sc4, connectivity.Connecting)
bg.handleSubConnStateChange(sc4, connectivity.Ready)
bg.HandleSubConnStateChange(sc1, connectivity.Connecting)
bg.HandleSubConnStateChange(sc1, connectivity.Ready)
bg.HandleSubConnStateChange(sc2, connectivity.Connecting)
bg.HandleSubConnStateChange(sc2, connectivity.Ready)
bg.HandleSubConnStateChange(sc3, connectivity.Connecting)
bg.HandleSubConnStateChange(sc3, connectivity.Ready)
bg.HandleSubConnStateChange(sc4, connectivity.Connecting)
bg.HandleSubConnStateChange(sc4, connectivity.Ready)
// Test roundrobin on the last picker.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
var (
wantStart []internal.Locality
wantEnd []internal.Locality
wantCost []testServerLoad
wantCost []testutils.TestServerLoad
)
for i := 0; i < 10; i++ {
scst, _ := p1.Pick(balancer.PickInfo{})
@ -427,21 +428,21 @@ func (s) TestBalancerGroup_LoadReport(t *testing.T) {
})
wantEnd = append(wantEnd, locality)
wantCost = append(wantCost,
testServerLoad{name: serverLoadCPUName, d: 10},
testServerLoad{name: serverLoadMemoryName, d: 5},
testServerLoad{name: "pic", d: 3.14},
testServerLoad{name: "piu", d: 3.14})
testutils.TestServerLoad{Name: serverLoadCPUName, D: 10},
testutils.TestServerLoad{Name: serverLoadMemoryName, D: 5},
testutils.TestServerLoad{Name: "pic", D: 3.14},
testutils.TestServerLoad{Name: "piu", D: 3.14})
}
}
if !cmp.Equal(testLoadStore.callsStarted, wantStart) {
t.Fatalf("want started: %v, got: %v", testLoadStore.callsStarted, wantStart)
if !cmp.Equal(testLoadStore.CallsStarted, wantStart) {
t.Fatalf("want started: %v, got: %v", testLoadStore.CallsStarted, wantStart)
}
if !cmp.Equal(testLoadStore.callsEnded, wantEnd) {
t.Fatalf("want ended: %v, got: %v", testLoadStore.callsEnded, wantEnd)
if !cmp.Equal(testLoadStore.CallsEnded, wantEnd) {
t.Fatalf("want ended: %v, got: %v", testLoadStore.CallsEnded, wantEnd)
}
if !cmp.Equal(testLoadStore.callsCost, wantCost, cmp.AllowUnexported(testServerLoad{})) {
t.Fatalf("want cost: %v, got: %v", testLoadStore.callsCost, wantCost)
if !cmp.Equal(testLoadStore.CallsCost, wantCost, cmp.AllowUnexported(testutils.TestServerLoad{})) {
t.Fatalf("want cost: %v, got: %v", testLoadStore.CallsCost, wantCost)
}
}
@ -455,73 +456,73 @@ func (s) TestBalancerGroup_LoadReport(t *testing.T) {
// - b3, weight 1, backends [1,2]
// Start the balancer group again and check for behavior.
func (s) TestBalancerGroup_start_close(t *testing.T) {
cc := newTestClientConn(t)
bg := newBalancerGroup(cc, nil, nil)
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
// Add two balancers to group and send two resolved addresses to both
// balancers.
bg.add(testBalancerIDs[0], 2, rrBuilder)
bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
bg.add(testBalancerIDs[1], 1, rrBuilder)
bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
bg.Add(testBalancerIDs[0], 2, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
bg.Add(testBalancerIDs[1], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
bg.start()
bg.Start()
m1 := make(map[resolver.Address]balancer.SubConn)
for i := 0; i < 4; i++ {
addrs := <-cc.newSubConnAddrsCh
sc := <-cc.newSubConnCh
addrs := <-cc.NewSubConnAddrsCh
sc := <-cc.NewSubConnCh
m1[addrs[0]] = sc
bg.handleSubConnStateChange(sc, connectivity.Connecting)
bg.handleSubConnStateChange(sc, connectivity.Ready)
bg.HandleSubConnStateChange(sc, connectivity.Connecting)
bg.HandleSubConnStateChange(sc, connectivity.Ready)
}
// Test roundrobin on the last picker.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{
m1[testBackendAddrs[0]], m1[testBackendAddrs[0]],
m1[testBackendAddrs[1]], m1[testBackendAddrs[1]],
m1[testBackendAddrs[2]], m1[testBackendAddrs[3]],
}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
bg.close()
bg.Close()
for i := 0; i < 4; i++ {
bg.handleSubConnStateChange(<-cc.removeSubConnCh, connectivity.Shutdown)
bg.HandleSubConnStateChange(<-cc.RemoveSubConnCh, connectivity.Shutdown)
}
// Add b3, weight 1, backends [1,2].
bg.add(testBalancerIDs[2], 1, rrBuilder)
bg.handleResolvedAddrs(testBalancerIDs[2], testBackendAddrs[1:3])
bg.Add(testBalancerIDs[2], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[2], testBackendAddrs[1:3])
// Remove b1.
bg.remove(testBalancerIDs[0])
bg.Remove(testBalancerIDs[0])
// Update b2 to weight 3, backends [0,3].
bg.changeWeight(testBalancerIDs[1], 3)
bg.handleResolvedAddrs(testBalancerIDs[1], append([]resolver.Address(nil), testBackendAddrs[0], testBackendAddrs[3]))
bg.ChangeWeight(testBalancerIDs[1], 3)
bg.HandleResolvedAddrs(testBalancerIDs[1], append([]resolver.Address(nil), testBackendAddrs[0], testBackendAddrs[3]))
bg.start()
bg.Start()
m2 := make(map[resolver.Address]balancer.SubConn)
for i := 0; i < 4; i++ {
addrs := <-cc.newSubConnAddrsCh
sc := <-cc.newSubConnCh
addrs := <-cc.NewSubConnAddrsCh
sc := <-cc.NewSubConnCh
m2[addrs[0]] = sc
bg.handleSubConnStateChange(sc, connectivity.Connecting)
bg.handleSubConnStateChange(sc, connectivity.Ready)
bg.HandleSubConnStateChange(sc, connectivity.Connecting)
bg.HandleSubConnStateChange(sc, connectivity.Ready)
}
// Test roundrobin on the last picker.
p2 := <-cc.newPickerCh
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{
m2[testBackendAddrs[0]], m2[testBackendAddrs[0]], m2[testBackendAddrs[0]],
m2[testBackendAddrs[3]], m2[testBackendAddrs[3]], m2[testBackendAddrs[3]],
m2[testBackendAddrs[1]], m2[testBackendAddrs[2]],
}
if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
@ -539,21 +540,21 @@ func (s) TestBalancerGroup_start_close(t *testing.T) {
// whenever it gets an address update. It's expected that start() doesn't block
// because of deadlock.
func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
cc := newTestClientConn(t)
bg := newBalancerGroup(cc, nil, nil)
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
bg.add(testBalancerIDs[0], 2, &testConstBalancerBuilder{})
bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
bg.add(testBalancerIDs[1], 1, &testConstBalancerBuilder{})
bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
bg.Add(testBalancerIDs[0], 2, &testutils.TestConstBalancerBuilder{})
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
bg.Add(testBalancerIDs[1], 1, &testutils.TestConstBalancerBuilder{})
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
bg.start()
bg.Start()
}
func replaceDefaultSubBalancerCloseTimeout(n time.Duration) func() {
old := defaultSubBalancerCloseTimeout
defaultSubBalancerCloseTimeout = n
return func() { defaultSubBalancerCloseTimeout = old }
old := DefaultSubBalancerCloseTimeout
DefaultSubBalancerCloseTimeout = n
return func() { DefaultSubBalancerCloseTimeout = old }
}
// initBalancerGroupForCachingTest creates a balancer group, and initialize it
@ -562,56 +563,56 @@ func replaceDefaultSubBalancerCloseTimeout(n time.Duration) func() {
// Two rr balancers are added to bg, each with 2 ready subConns. A sub-balancer
// is removed later, so the balancer group returned has one sub-balancer in its
// own map, and one sub-balancer in cache.
func initBalancerGroupForCachingTest(t *testing.T) (*balancerGroup, *testClientConn, map[resolver.Address]balancer.SubConn) {
cc := newTestClientConn(t)
bg := newBalancerGroup(cc, nil, nil)
func initBalancerGroupForCachingTest(t *testing.T) (*BalancerGroup, *testutils.TestClientConn, map[resolver.Address]balancer.SubConn) {
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
// Add two balancers to group and send two resolved addresses to both
// balancers.
bg.add(testBalancerIDs[0], 2, rrBuilder)
bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
bg.add(testBalancerIDs[1], 1, rrBuilder)
bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
bg.Add(testBalancerIDs[0], 2, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
bg.Add(testBalancerIDs[1], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
bg.start()
bg.Start()
m1 := make(map[resolver.Address]balancer.SubConn)
for i := 0; i < 4; i++ {
addrs := <-cc.newSubConnAddrsCh
sc := <-cc.newSubConnCh
addrs := <-cc.NewSubConnAddrsCh
sc := <-cc.NewSubConnCh
m1[addrs[0]] = sc
bg.handleSubConnStateChange(sc, connectivity.Connecting)
bg.handleSubConnStateChange(sc, connectivity.Ready)
bg.HandleSubConnStateChange(sc, connectivity.Connecting)
bg.HandleSubConnStateChange(sc, connectivity.Ready)
}
// Test roundrobin on the last picker.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{
m1[testBackendAddrs[0]], m1[testBackendAddrs[0]],
m1[testBackendAddrs[1]], m1[testBackendAddrs[1]],
m1[testBackendAddrs[2]], m1[testBackendAddrs[3]],
}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
bg.remove(testBalancerIDs[1])
bg.Remove(testBalancerIDs[1])
// Don't wait for SubConns to be removed after close, because they are only
// removed after close timeout.
for i := 0; i < 10; i++ {
select {
case <-cc.removeSubConnCh:
case <-cc.RemoveSubConnCh:
t.Fatalf("Got request to remove subconn, want no remove subconn (because subconns were still in cache)")
default:
}
time.Sleep(time.Millisecond)
}
// Test roundrobin on the with only sub-balancer0.
p2 := <-cc.newPickerCh
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{
m1[testBackendAddrs[0]], m1[testBackendAddrs[1]],
}
if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
@ -626,10 +627,10 @@ func (s) TestBalancerGroup_locality_caching(t *testing.T) {
// Turn down subconn for addr2, shouldn't get picker update because
// sub-balancer1 was removed.
bg.handleSubConnStateChange(addrToSC[testBackendAddrs[2]], connectivity.TransientFailure)
bg.HandleSubConnStateChange(addrToSC[testBackendAddrs[2]], connectivity.TransientFailure)
for i := 0; i < 10; i++ {
select {
case <-cc.newPickerCh:
case <-cc.NewPickerCh:
t.Fatalf("Got new picker, want no new picker (because the sub-balancer was removed)")
default:
}
@ -642,22 +643,22 @@ func (s) TestBalancerGroup_locality_caching(t *testing.T) {
// Re-add sub-balancer-1, because subconns were in cache, no new subconns
// should be created. But a new picker will still be generated, with subconn
// states update to date.
bg.add(testBalancerIDs[1], 1, rrBuilder)
bg.Add(testBalancerIDs[1], 1, rrBuilder)
p3 := <-cc.newPickerCh
p3 := <-cc.NewPickerCh
want := []balancer.SubConn{
addrToSC[testBackendAddrs[0]], addrToSC[testBackendAddrs[0]],
addrToSC[testBackendAddrs[1]], addrToSC[testBackendAddrs[1]],
// addr2 is down, b2 only has addr3 in READY state.
addrToSC[testBackendAddrs[3]], addrToSC[testBackendAddrs[3]],
}
if err := isRoundRobin(want, subConnFromPicker(p3)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p3)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
for i := 0; i < 10; i++ {
select {
case <-cc.newSubConnAddrsCh:
case <-cc.NewSubConnAddrsCh:
t.Fatalf("Got new subconn, want no new subconn (because subconns were still in cache)")
default:
}
@ -672,7 +673,7 @@ func (s) TestBalancerGroup_locality_caching_close_group(t *testing.T) {
defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)()
bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
bg.close()
bg.Close()
// The balancer group is closed. The subconns should be removed immediately.
removeTimeout := time.After(time.Millisecond * 500)
scToRemove := map[balancer.SubConn]int{
@ -683,7 +684,7 @@ func (s) TestBalancerGroup_locality_caching_close_group(t *testing.T) {
}
for i := 0; i < len(scToRemove); i++ {
select {
case sc := <-cc.removeSubConnCh:
case sc := <-cc.RemoveSubConnCh:
c := scToRemove[sc]
if c == 0 {
t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c)
@ -703,14 +704,14 @@ func (s) TestBalancerGroup_locality_caching_not_readd_within_timeout(t *testing.
// The sub-balancer is not re-added withtin timeout. The subconns should be
// removed.
removeTimeout := time.After(defaultSubBalancerCloseTimeout)
removeTimeout := time.After(DefaultSubBalancerCloseTimeout)
scToRemove := map[balancer.SubConn]int{
addrToSC[testBackendAddrs[2]]: 1,
addrToSC[testBackendAddrs[3]]: 1,
}
for i := 0; i < len(scToRemove); i++ {
select {
case sc := <-cc.removeSubConnCh:
case sc := <-cc.RemoveSubConnCh:
c := scToRemove[sc]
if c == 0 {
t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c)
@ -737,7 +738,7 @@ func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *test
// sub-balancer was still in cache, but cann't be reused. This should cause
// old sub-balancer's subconns to be removed immediately, and new subconns
// to be created.
bg.add(testBalancerIDs[1], 1, &noopBalancerBuilderWrapper{rrBuilder})
bg.Add(testBalancerIDs[1], 1, &noopBalancerBuilderWrapper{rrBuilder})
// The cached sub-balancer should be closed, and the subconns should be
// removed immediately.
@ -748,7 +749,7 @@ func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *test
}
for i := 0; i < len(scToRemove); i++ {
select {
case sc := <-cc.removeSubConnCh:
case sc := <-cc.RemoveSubConnCh:
c := scToRemove[sc]
if c == 0 {
t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c)
@ -759,7 +760,7 @@ func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *test
}
}
bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[4:6])
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[4:6])
newSCTimeout := time.After(time.Millisecond * 500)
scToAdd := map[resolver.Address]int{
@ -768,29 +769,29 @@ func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *test
}
for i := 0; i < len(scToAdd); i++ {
select {
case addr := <-cc.newSubConnAddrsCh:
case addr := <-cc.NewSubConnAddrsCh:
c := scToAdd[addr[0]]
if c == 0 {
t.Fatalf("Got newSubConn for %v when there's %d new expected", addr, c)
}
scToAdd[addr[0]] = c - 1
sc := <-cc.newSubConnCh
sc := <-cc.NewSubConnCh
addrToSC[addr[0]] = sc
bg.handleSubConnStateChange(sc, connectivity.Connecting)
bg.handleSubConnStateChange(sc, connectivity.Ready)
bg.HandleSubConnStateChange(sc, connectivity.Connecting)
bg.HandleSubConnStateChange(sc, connectivity.Ready)
case <-newSCTimeout:
t.Fatalf("timeout waiting for subConns (from new sub-balancer) to be newed")
}
}
// Test roundrobin on the new picker.
p3 := <-cc.newPickerCh
p3 := <-cc.NewPickerCh
want := []balancer.SubConn{
addrToSC[testBackendAddrs[0]], addrToSC[testBackendAddrs[0]],
addrToSC[testBackendAddrs[1]], addrToSC[testBackendAddrs[1]],
addrToSC[testBackendAddrs[4]], addrToSC[testBackendAddrs[5]],
}
if err := isRoundRobin(want, subConnFromPicker(p3)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p3)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}

View File

@ -0,0 +1,38 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package balancergroup
import (
"testing"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/xds/internal/testutils"
)
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
func init() {
NewRandomWRR = testutils.NewTestWRR
}

View File

@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/balancer/lrs"
xdsclient "google.golang.org/grpc/xds/internal/client"
)
@ -46,7 +47,7 @@ type localityConfig struct {
// balancerGroupWithConfig contains the localities with the same priority. It
// manages all localities using a balancerGroup.
type balancerGroupWithConfig struct {
bg *balancerGroup
bg *balancergroup.BalancerGroup
configs map[internal.Locality]*localityConfig
}
@ -136,9 +137,9 @@ func (edsImpl *edsBalancerImpl) HandleChildPolicy(name string, config json.RawMe
// TODO: (eds) add support to balancer group to support smoothly
// switching sub-balancers (keep old balancer around until new
// balancer becomes ready).
bgwc.bg.remove(id)
bgwc.bg.add(id, config.weight, edsImpl.subBalancerBuilder)
bgwc.bg.handleResolvedAddrs(id, config.addrs)
bgwc.bg.Remove(id)
bgwc.bg.Add(id, config.weight, edsImpl.subBalancerBuilder)
bgwc.bg.HandleResolvedAddrs(id, config.addrs)
}
}
}
@ -217,7 +218,7 @@ func (edsImpl *edsBalancerImpl) HandleEDSResponse(edsResp *xdsclient.EDSUpdate)
// be started when necessary (e.g. when higher is down, or if it's a
// new lowest priority).
bgwc = &balancerGroupWithConfig{
bg: newBalancerGroup(edsImpl.ccWrapperWithPriority(priority), edsImpl.loadStore, edsImpl.logger),
bg: balancergroup.New(edsImpl.ccWrapperWithPriority(priority), edsImpl.loadStore, edsImpl.logger),
configs: make(map[internal.Locality]*localityConfig),
}
edsImpl.priorityToLocalities[priority] = bgwc
@ -233,7 +234,7 @@ func (edsImpl *edsBalancerImpl) HandleEDSResponse(edsResp *xdsclient.EDSUpdate)
for p, bgwc := range edsImpl.priorityToLocalities {
if _, ok := newLocalitiesWithPriority[p]; !ok {
delete(edsImpl.priorityToLocalities, p)
bgwc.bg.close()
bgwc.bg.Close()
delete(edsImpl.priorityToState, p)
priorityChanged = true
edsImpl.logger.Infof("Priority %v deleted", p)
@ -284,7 +285,7 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup
config, ok := bgwc.configs[lid]
if !ok {
// A new balancer, add it to balancer group and balancer map.
bgwc.bg.add(lid, newWeight, edsImpl.subBalancerBuilder)
bgwc.bg.Add(lid, newWeight, edsImpl.subBalancerBuilder)
config = &localityConfig{
weight: newWeight,
}
@ -307,19 +308,19 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup
if weightChanged {
config.weight = newWeight
bgwc.bg.changeWeight(lid, newWeight)
bgwc.bg.ChangeWeight(lid, newWeight)
}
if addrsChanged {
config.addrs = newAddrs
bgwc.bg.handleResolvedAddrs(lid, newAddrs)
bgwc.bg.HandleResolvedAddrs(lid, newAddrs)
}
}
// Delete localities that are removed in the latest response.
for lid := range bgwc.configs {
if _, ok := newLocalitiesSet[lid]; !ok {
bgwc.bg.remove(lid)
bgwc.bg.Remove(lid)
delete(bgwc.configs, lid)
edsImpl.logger.Infof("Locality %v deleted", lid)
}
@ -343,7 +344,7 @@ func (edsImpl *edsBalancerImpl) HandleSubConnStateChange(sc balancer.SubConn, s
return
}
if bg := bgwc.bg; bg != nil {
bg.handleSubConnStateChange(sc, s)
bg.HandleSubConnStateChange(sc, s)
}
}
@ -407,7 +408,7 @@ func (edsImpl *edsBalancerImpl) newSubConn(priority priorityType, addrs []resolv
func (edsImpl *edsBalancerImpl) Close() {
for _, bgwc := range edsImpl.priorityToLocalities {
if bg := bgwc.bg; bg != nil {
bg.close()
bg.Close()
}
}
}

View File

@ -102,7 +102,7 @@ func (edsImpl *edsBalancerImpl) startPriority(priority priorityType) {
// currently avoided by handling balancer update in a goroutine (the run
// goroutine in the parent eds balancer). When priority balancer is split
// into its own, this asynchronous state handling needs to be copied.
p.bg.start()
p.bg.Start()
// startPriority can be called when
// 1. first EDS resp, start p0
// 2. a high priority goes Failure, start next
@ -189,7 +189,7 @@ func (edsImpl *edsBalancerImpl) handlePriorityWithNewStateReady(priority priorit
edsImpl.logger.Infof("Switching priority from %v to %v, because latter became Ready", edsImpl.priorityInUse, priority)
edsImpl.priorityInUse = priority
for i := priority.nextLower(); !i.lowerThan(edsImpl.priorityLowest); i = i.nextLower() {
edsImpl.priorityToLocalities[i].bg.close()
edsImpl.priorityToLocalities[i].bg.Close()
}
return true
}

View File

@ -26,6 +26,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/testutils"
)
// When a high priority is ready, adding/removing lower locality doesn't cause
@ -33,7 +34,7 @@ import (
//
// Init 0 and 1; 0 is up, use 0; add 2, use 0; remove 2, use 0.
func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
cc := newTestClientConn(t)
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
@ -43,20 +44,20 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
addrs1 := <-cc.newSubConnAddrsCh
addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.newSubConnCh
sc1 := <-cc.NewSubConnCh
// p0 is ready.
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)
// Test roundrobin with only p0 subconns.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
@ -68,11 +69,11 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build()))
select {
case <-cc.newPickerCh:
case <-cc.NewPickerCh:
t.Fatalf("got unexpected new picker")
case <-cc.newSubConnCh:
case <-cc.NewSubConnCh:
t.Fatalf("got unexpected new SubConn")
case <-cc.removeSubConnCh:
case <-cc.RemoveSubConnCh:
t.Fatalf("got unexpected remove SubConn")
case <-time.After(time.Millisecond * 100):
}
@ -84,11 +85,11 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab3.Build()))
select {
case <-cc.newPickerCh:
case <-cc.NewPickerCh:
t.Fatalf("got unexpected new picker")
case <-cc.newSubConnCh:
case <-cc.NewSubConnCh:
t.Fatalf("got unexpected new SubConn")
case <-cc.removeSubConnCh:
case <-cc.RemoveSubConnCh:
t.Fatalf("got unexpected remove SubConn")
case <-time.After(time.Millisecond * 100):
}
@ -99,7 +100,7 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
// Init 0 and 1; 0 is up, use 0; 0 is down, 1 is up, use 1; add 2, use 1; 1 is
// down, use 2; remove 2, use 1.
func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
cc := newTestClientConn(t)
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
@ -109,38 +110,38 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
addrs0 := <-cc.newSubConnAddrsCh
addrs0 := <-cc.NewSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc0 := <-cc.newSubConnCh
sc0 := <-cc.NewSubConnCh
// p0 is ready.
edsb.HandleSubConnStateChange(sc0, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc0, connectivity.Ready)
// Test roundrobin with only p0 subconns.
p0 := <-cc.newPickerCh
p0 := <-cc.NewPickerCh
want := []balancer.SubConn{sc0}
if err := isRoundRobin(want, subConnFromPicker(p0)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p0)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Turn down 0, 1 is used.
edsb.HandleSubConnStateChange(sc0, connectivity.TransientFailure)
addrs1 := <-cc.newSubConnAddrsCh
addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[1]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.newSubConnCh
sc1 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)
// Test pick with 1.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testSubConn{})) {
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
}
@ -153,30 +154,30 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build()))
select {
case <-cc.newPickerCh:
case <-cc.NewPickerCh:
t.Fatalf("got unexpected new picker")
case <-cc.newSubConnCh:
case <-cc.NewSubConnCh:
t.Fatalf("got unexpected new SubConn")
case <-cc.removeSubConnCh:
case <-cc.RemoveSubConnCh:
t.Fatalf("got unexpected remove SubConn")
case <-time.After(time.Millisecond * 100):
}
// Turn down 1, use 2
edsb.HandleSubConnStateChange(sc1, connectivity.TransientFailure)
addrs2 := <-cc.newSubConnAddrsCh
addrs2 := <-cc.NewSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc2 := <-cc.newSubConnCh
sc2 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)
// Test pick with 2.
p2 := <-cc.newPickerCh
p2 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p2.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc2, cmp.AllowUnexported(testSubConn{})) {
if !cmp.Equal(gotSCSt.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2)
}
}
@ -188,13 +189,13 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab3.Build()))
// p2 SubConns are removed.
scToRemove := <-cc.removeSubConnCh
if !cmp.Equal(scToRemove, sc2, cmp.AllowUnexported(testSubConn{})) {
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove)
}
// Should get an update with 1's old picker, to override 2's old picker.
p3 := <-cc.newPickerCh
p3 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := p3.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure {
t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err)
@ -206,7 +207,7 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
//
// Init 0 and 1; 0 and 1 both down; add 2, use 2.
func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
cc := newTestClientConn(t)
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
@ -216,24 +217,24 @@ func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
addrs0 := <-cc.newSubConnAddrsCh
addrs0 := <-cc.NewSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc0 := <-cc.newSubConnCh
sc0 := <-cc.NewSubConnCh
// Turn down 0, 1 is used.
edsb.HandleSubConnStateChange(sc0, connectivity.TransientFailure)
addrs1 := <-cc.newSubConnAddrsCh
addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[1]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.newSubConnCh
sc1 := <-cc.NewSubConnCh
// Turn down 1, pick should error.
edsb.HandleSubConnStateChange(sc1, connectivity.TransientFailure)
// Test pick failure.
pFail := <-cc.newPickerCh
pFail := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := pFail.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure {
t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err)
@ -247,19 +248,19 @@ func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build()))
addrs2 := <-cc.newSubConnAddrsCh
addrs2 := <-cc.NewSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc2 := <-cc.newSubConnCh
sc2 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)
// Test pick with 2.
p2 := <-cc.newPickerCh
p2 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p2.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc2, cmp.AllowUnexported(testSubConn{})) {
if !cmp.Equal(gotSCSt.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2)
}
}
@ -271,7 +272,7 @@ func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
func (s) TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) {
defer time.Sleep(10 * time.Millisecond)
cc := newTestClientConn(t)
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
@ -282,34 +283,34 @@ func (s) TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) {
clab1.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
addrs0 := <-cc.newSubConnAddrsCh
addrs0 := <-cc.NewSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc0 := <-cc.newSubConnCh
sc0 := <-cc.NewSubConnCh
// Turn down 0, 1 is used.
edsb.HandleSubConnStateChange(sc0, connectivity.TransientFailure)
addrs1 := <-cc.newSubConnAddrsCh
addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[1]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.newSubConnCh
sc1 := <-cc.NewSubConnCh
// Turn down 1, 2 is used.
edsb.HandleSubConnStateChange(sc1, connectivity.TransientFailure)
addrs2 := <-cc.newSubConnAddrsCh
addrs2 := <-cc.NewSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc2 := <-cc.newSubConnCh
sc2 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)
// Test pick with 2.
p2 := <-cc.newPickerCh
p2 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p2.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc2, cmp.AllowUnexported(testSubConn{})) {
if !cmp.Equal(gotSCSt.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2)
}
}
@ -321,19 +322,19 @@ func (s) TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) {
//
// With localities caching, the lower priorities are closed after a timeout,
// in goroutines. The order is no longer guaranteed.
scToRemove := []balancer.SubConn{<-cc.removeSubConnCh, <-cc.removeSubConnCh}
if !(cmp.Equal(scToRemove[0], sc1, cmp.AllowUnexported(testSubConn{})) &&
cmp.Equal(scToRemove[1], sc2, cmp.AllowUnexported(testSubConn{}))) &&
!(cmp.Equal(scToRemove[0], sc2, cmp.AllowUnexported(testSubConn{})) &&
cmp.Equal(scToRemove[1], sc1, cmp.AllowUnexported(testSubConn{}))) {
scToRemove := []balancer.SubConn{<-cc.RemoveSubConnCh, <-cc.RemoveSubConnCh}
if !(cmp.Equal(scToRemove[0], sc1, cmp.AllowUnexported(testutils.TestSubConn{})) &&
cmp.Equal(scToRemove[1], sc2, cmp.AllowUnexported(testutils.TestSubConn{}))) &&
!(cmp.Equal(scToRemove[0], sc2, cmp.AllowUnexported(testutils.TestSubConn{})) &&
cmp.Equal(scToRemove[1], sc1, cmp.AllowUnexported(testutils.TestSubConn{}))) {
t.Errorf("RemoveSubConn, want [%v, %v], got %v", sc1, sc2, scToRemove)
}
// Test pick with 0.
p0 := <-cc.newPickerCh
p0 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p0.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testSubConn{})) {
if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0)
}
}
@ -353,7 +354,7 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) {
}
}()()
cc := newTestClientConn(t)
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
@ -363,11 +364,11 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) {
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
addrs0 := <-cc.newSubConnAddrsCh
addrs0 := <-cc.NewSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc0 := <-cc.newSubConnCh
sc0 := <-cc.NewSubConnCh
// Keep 0 in connecting, 1 will be used after init timeout.
edsb.HandleSubConnStateChange(sc0, connectivity.Connecting)
@ -375,24 +376,24 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) {
// Make sure new SubConn is created before timeout.
select {
case <-time.After(testPriorityInitTimeout * 3 / 4):
case <-cc.newSubConnAddrsCh:
case <-cc.NewSubConnAddrsCh:
t.Fatalf("Got a new SubConn too early (Within timeout). Expect a new SubConn only after timeout")
}
addrs1 := <-cc.newSubConnAddrsCh
addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[1]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.newSubConnCh
sc1 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)
// Test pick with 1.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testSubConn{})) {
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
}
@ -403,7 +404,7 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) {
// - start with 2 locality with p0 and p1
// - add localities to existing p0 and p1
func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
cc := newTestClientConn(t)
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
@ -413,51 +414,51 @@ func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
clab0.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab0.Build()))
addrs0 := <-cc.newSubConnAddrsCh
addrs0 := <-cc.NewSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc0 := <-cc.newSubConnCh
sc0 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc0, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc0, connectivity.Ready)
// Test roundrobin with only p0 subconns.
p0 := <-cc.newPickerCh
p0 := <-cc.NewPickerCh
want := []balancer.SubConn{sc0}
if err := isRoundRobin(want, subConnFromPicker(p0)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p0)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Turn down p0 subconns, p1 subconns will be created.
edsb.HandleSubConnStateChange(sc0, connectivity.TransientFailure)
addrs1 := <-cc.newSubConnAddrsCh
addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[1]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.newSubConnCh
sc1 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)
// Test roundrobin with only p1 subconns.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
want = []balancer.SubConn{sc1}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Reconnect p0 subconns, p1 subconn will be closed.
edsb.HandleSubConnStateChange(sc0, connectivity.Ready)
scToRemove := <-cc.removeSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testSubConn{})) {
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
}
// Test roundrobin with only p0 subconns.
p2 := <-cc.newPickerCh
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{sc0}
if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
@ -469,18 +470,18 @@ func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
clab1.AddLocality(testSubZones[3], 1, 1, testEndpointAddrs[3:4], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
addrs2 := <-cc.newSubConnAddrsCh
addrs2 := <-cc.NewSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc2 := <-cc.newSubConnCh
sc2 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)
// Test roundrobin with only two p0 subconns.
p3 := <-cc.newPickerCh
p3 := <-cc.NewPickerCh
want = []balancer.SubConn{sc0, sc2}
if err := isRoundRobin(want, subConnFromPicker(p3)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p3)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
@ -488,17 +489,17 @@ func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
edsb.HandleSubConnStateChange(sc0, connectivity.TransientFailure)
edsb.HandleSubConnStateChange(sc2, connectivity.TransientFailure)
sc3 := <-cc.newSubConnCh
sc3 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc3, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc3, connectivity.Ready)
sc4 := <-cc.newSubConnCh
sc4 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc4, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc4, connectivity.Ready)
// Test roundrobin with only p1 subconns.
p4 := <-cc.newPickerCh
p4 := <-cc.NewPickerCh
want = []balancer.SubConn{sc3, sc4}
if err := isRoundRobin(want, subConnFromPicker(p4)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p4)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
@ -514,7 +515,7 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) {
}
}()()
cc := newTestClientConn(t)
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
@ -524,18 +525,18 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) {
clab0.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab0.Build()))
addrs0 := <-cc.newSubConnAddrsCh
addrs0 := <-cc.NewSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc0 := <-cc.newSubConnCh
sc0 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc0, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc0, connectivity.Ready)
// Test roundrobin with only p0 subconns.
p0 := <-cc.newPickerCh
p0 := <-cc.NewPickerCh
want := []balancer.SubConn{sc0}
if err := isRoundRobin(want, subConnFromPicker(p0)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p0)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
@ -544,13 +545,13 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) {
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
// p0 subconn should be removed.
scToRemove := <-cc.removeSubConnCh
if !cmp.Equal(scToRemove, sc0, cmp.AllowUnexported(testSubConn{})) {
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc0, scToRemove)
}
// Test pick return TransientFailure.
pFail := <-cc.newPickerCh
pFail := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := pFail.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure {
t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err)
@ -563,29 +564,29 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) {
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[3:4], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build()))
addrs01 := <-cc.newSubConnAddrsCh
addrs01 := <-cc.NewSubConnAddrsCh
if got, want := addrs01[0].Addr, testEndpointAddrs[2]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc01 := <-cc.newSubConnCh
sc01 := <-cc.NewSubConnCh
// Don't send any update to p0, so to not override the old state of p0.
// Later, connect to p1 and then remove p1. This will fallback to p0, and
// will send p0's old picker if they are not correctly removed.
// p1 will be used after priority init timeout.
addrs11 := <-cc.newSubConnAddrsCh
addrs11 := <-cc.NewSubConnAddrsCh
if got, want := addrs11[0].Addr, testEndpointAddrs[3]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc11 := <-cc.newSubConnCh
sc11 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc11, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc11, connectivity.Ready)
// Test roundrobin with only p1 subconns.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
want = []balancer.SubConn{sc11}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
@ -595,13 +596,13 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) {
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab3.Build()))
// p1 subconn should be removed.
scToRemove1 := <-cc.removeSubConnCh
if !cmp.Equal(scToRemove1, sc11, cmp.AllowUnexported(testSubConn{})) {
scToRemove1 := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove1, sc11, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc11, scToRemove1)
}
// Test pick return TransientFailure.
pFail1 := <-cc.newPickerCh
pFail1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if scst, err := pFail1.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable {
t.Fatalf("want pick error _, %v, got %v, _ ,%v", balancer.ErrTransientFailure, scst, err)
@ -614,18 +615,18 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) {
edsb.HandleSubConnStateChange(sc01, connectivity.Ready)
// Test roundrobin with only p0 subconns.
p2 := <-cc.newPickerCh
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{sc01}
if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
select {
case <-cc.newPickerCh:
case <-cc.NewPickerCh:
t.Fatalf("got unexpected new picker")
case <-cc.newSubConnCh:
case <-cc.NewSubConnCh:
t.Fatalf("got unexpected new SubConn")
case <-cc.removeSubConnCh:
case <-cc.RemoveSubConnCh:
t.Fatalf("got unexpected remove SubConn")
case <-time.After(time.Millisecond * 100):
}
@ -658,7 +659,7 @@ func (s) TestPriorityType(t *testing.T) {
// Test the case where the high priority contains no backends. The low priority
// will be used.
func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) {
cc := newTestClientConn(t)
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
@ -668,20 +669,20 @@ func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) {
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
addrs1 := <-cc.newSubConnAddrsCh
addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.newSubConnCh
sc1 := <-cc.NewSubConnCh
// p0 is ready.
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)
// Test roundrobin with only p0 subconns.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
@ -693,23 +694,23 @@ func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) {
// p0 will remove the subconn, and ClientConn will send a sc update to
// shutdown.
scToRemove := <-cc.removeSubConnCh
scToRemove := <-cc.RemoveSubConnCh
edsb.HandleSubConnStateChange(scToRemove, connectivity.Shutdown)
addrs2 := <-cc.newSubConnAddrsCh
addrs2 := <-cc.NewSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[1]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc2 := <-cc.newSubConnCh
sc2 := <-cc.NewSubConnCh
// p1 is ready.
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)
// Test roundrobin with only p1 subconns.
p2 := <-cc.newPickerCh
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{sc2}
if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
@ -717,7 +718,7 @@ func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) {
// Test the case where the high priority contains no healthy backends. The low
// priority will be used.
func (s) TestEDSPriority_HighPriorityAllUnhealthy(t *testing.T) {
cc := newTestClientConn(t)
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
@ -727,20 +728,20 @@ func (s) TestEDSPriority_HighPriorityAllUnhealthy(t *testing.T) {
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
addrs1 := <-cc.newSubConnAddrsCh
addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.newSubConnCh
sc1 := <-cc.NewSubConnCh
// p0 is ready.
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)
// Test roundrobin with only p0 subconns.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
@ -754,23 +755,23 @@ func (s) TestEDSPriority_HighPriorityAllUnhealthy(t *testing.T) {
// p0 will remove the subconn, and ClientConn will send a sc update to
// transient failure.
scToRemove := <-cc.removeSubConnCh
scToRemove := <-cc.RemoveSubConnCh
edsb.HandleSubConnStateChange(scToRemove, connectivity.Shutdown)
addrs2 := <-cc.newSubConnAddrsCh
addrs2 := <-cc.NewSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[1]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc2 := <-cc.newSubConnCh
sc2 := <-cc.NewSubConnCh
// p1 is ready.
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)
// Test roundrobin with only p1 subconns.
p2 := <-cc.newPickerCh
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{sc2}
if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}

View File

@ -29,7 +29,9 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/testutils"
)
var (
@ -38,10 +40,14 @@ var (
testEndpointAddrs []string
)
const testBackendAddrsCount = 12
func init() {
for i := 0; i < testBackendAddrsCount; i++ {
testEndpointAddrs = append(testEndpointAddrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
}
balancergroup.NewRandomWRR = testutils.NewTestWRR
balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
}
// One locality
@ -50,7 +56,7 @@ func init() {
// - replace backend
// - change drop rate
func (s) TestEDS_OneLocality(t *testing.T) {
cc := newTestClientConn(t)
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
@ -59,15 +65,15 @@ func (s) TestEDS_OneLocality(t *testing.T) {
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
sc1 := <-cc.newSubConnCh
sc1 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)
// Pick with only the first backend.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testSubConn{})) {
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
}
@ -77,14 +83,14 @@ func (s) TestEDS_OneLocality(t *testing.T) {
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build()))
sc2 := <-cc.newSubConnCh
sc2 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)
// Test roundrobin with two subconns.
p2 := <-cc.newPickerCh
p2 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc2}
if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
@ -93,17 +99,17 @@ func (s) TestEDS_OneLocality(t *testing.T) {
clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab3.Build()))
scToRemove := <-cc.removeSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testSubConn{})) {
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
}
edsb.HandleSubConnStateChange(scToRemove, connectivity.Shutdown)
// Test pick with only the second subconn.
p3 := <-cc.newPickerCh
p3 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p3.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc2, cmp.AllowUnexported(testSubConn{})) {
if !cmp.Equal(gotSCSt.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2)
}
}
@ -113,20 +119,20 @@ func (s) TestEDS_OneLocality(t *testing.T) {
clab4.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab4.Build()))
sc3 := <-cc.newSubConnCh
sc3 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc3, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc3, connectivity.Ready)
scToRemove = <-cc.removeSubConnCh
if !cmp.Equal(scToRemove, sc2, cmp.AllowUnexported(testSubConn{})) {
scToRemove = <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove)
}
edsb.HandleSubConnStateChange(scToRemove, connectivity.Shutdown)
// Test pick with only the third subconn.
p4 := <-cc.newPickerCh
p4 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p4.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc3, cmp.AllowUnexported(testSubConn{})) {
if !cmp.Equal(gotSCSt.SubConn, sc3, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc3)
}
}
@ -137,7 +143,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab5.Build()))
// Picks with drops.
p5 := <-cc.newPickerCh
p5 := <-cc.NewPickerCh
for i := 0; i < 100; i++ {
_, err := p5.Pick(balancer.PickInfo{})
// TODO: the dropping algorithm needs a design. When the dropping algorithm
@ -155,10 +161,10 @@ func (s) TestEDS_OneLocality(t *testing.T) {
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab6.Build()))
// Pick without drops.
p6 := <-cc.newPickerCh
p6 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p6.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc3, cmp.AllowUnexported(testSubConn{})) {
if !cmp.Equal(gotSCSt.SubConn, sc3, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc3)
}
}
@ -171,7 +177,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
// - address change for the <not-the-first> locality
// - update locality weight
func (s) TestEDS_TwoLocalities(t *testing.T) {
cc := newTestClientConn(t)
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
@ -179,7 +185,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
sc1 := <-cc.newSubConnCh
sc1 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)
@ -188,14 +194,14 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
// keep localities.
clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
sc2 := <-cc.newSubConnCh
sc2 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)
// Test roundrobin with two subconns.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc2}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
@ -206,14 +212,14 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
clab2.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build()))
sc3 := <-cc.newSubConnCh
sc3 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc3, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc3, connectivity.Ready)
// Test roundrobin with three subconns.
p2 := <-cc.newPickerCh
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{sc1, sc2, sc3}
if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
@ -223,16 +229,16 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
clab3.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab3.Build()))
scToRemove := <-cc.removeSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testSubConn{})) {
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
}
edsb.HandleSubConnStateChange(scToRemove, connectivity.Shutdown)
// Test pick with two subconns (without the first one).
p3 := <-cc.newPickerCh
p3 := <-cc.NewPickerCh
want = []balancer.SubConn{sc2, sc3}
if err := isRoundRobin(want, subConnFromPicker(p3)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p3)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
@ -242,17 +248,17 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
clab4.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab4.Build()))
sc4 := <-cc.newSubConnCh
sc4 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc4, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc4, connectivity.Ready)
// Test pick with two subconns (without the first one).
p4 := <-cc.newPickerCh
p4 := <-cc.NewPickerCh
// Locality-1 will be picked twice, and locality-2 will be picked twice.
// Locality-1 contains only sc2, locality-2 contains sc3 and sc4. So expect
// two sc2's and sc3, sc4.
want = []balancer.SubConn{sc2, sc2, sc3, sc4}
if err := isRoundRobin(want, subConnFromPicker(p4)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p4)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
@ -263,12 +269,12 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab5.Build()))
// Test pick with two subconns different locality weight.
p5 := <-cc.newPickerCh
p5 := <-cc.NewPickerCh
// Locality-1 will be picked four times, and locality-2 will be picked twice
// (weight 2 and 1). Locality-1 contains only sc2, locality-2 contains sc3 and
// sc4. So expect four sc2's and sc3, sc4.
want = []balancer.SubConn{sc2, sc2, sc2, sc2, sc3, sc4}
if err := isRoundRobin(want, subConnFromPicker(p5)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p5)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
@ -284,17 +290,17 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
// NOTE: this is because we handle locality with weight 0 same as the
// locality doesn't exist. If this changes in the future, this removeSubConn
// behavior will also change.
scToRemove2 := <-cc.removeSubConnCh
if !cmp.Equal(scToRemove2, sc2, cmp.AllowUnexported(testSubConn{})) {
scToRemove2 := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove2, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove2)
}
// Test pick with two subconns different locality weight.
p6 := <-cc.newPickerCh
p6 := <-cc.NewPickerCh
// Locality-1 will be not be picked, and locality-2 will be picked.
// Locality-2 contains sc3 and sc4. So expect sc3, sc4.
want = []balancer.SubConn{sc3, sc4}
if err := isRoundRobin(want, subConnFromPicker(p6)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p6)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
@ -302,7 +308,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
// The EDS balancer gets EDS resp with unhealthy endpoints. Test that only
// healthy ones are used.
func (s) TestEDS_EndpointsHealth(t *testing.T) {
cc := newTestClientConn(t)
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
@ -335,9 +341,9 @@ func (s) TestEDS_EndpointsHealth(t *testing.T) {
newSubConnAddrStrs []string
)
for i := 0; i < 4; i++ {
addr := <-cc.newSubConnAddrsCh
addr := <-cc.NewSubConnAddrsCh
newSubConnAddrStrs = append(newSubConnAddrStrs, addr[0].Addr)
sc := <-cc.newSubConnCh
sc := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc, connectivity.Ready)
readySCs = append(readySCs, sc)
@ -361,15 +367,15 @@ func (s) TestEDS_EndpointsHealth(t *testing.T) {
// There should be exactly 4 new SubConns. Check to make sure there's no
// more subconns being created.
select {
case <-cc.newSubConnCh:
case <-cc.NewSubConnCh:
t.Fatalf("Got unexpected new subconn")
case <-time.After(time.Microsecond * 100):
}
// Test roundrobin with the subconns.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
want := readySCs
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
@ -381,57 +387,11 @@ func (s) TestClose(t *testing.T) {
edsb.Close()
}
func init() {
balancer.Register(&testConstBalancerBuilder{})
}
var errTestConstPicker = fmt.Errorf("const picker error")
type testConstBalancerBuilder struct{}
func (*testConstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return &testConstBalancer{cc: cc}
}
func (*testConstBalancerBuilder) Name() string {
return "test-const-balancer"
}
type testConstBalancer struct {
cc balancer.ClientConn
}
func (tb *testConstBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
tb.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &testConstPicker{err: errTestConstPicker}})
}
func (tb *testConstBalancer) HandleResolvedAddrs(a []resolver.Address, err error) {
if len(a) == 0 {
return
}
tb.cc.NewSubConn(a, balancer.NewSubConnOptions{})
}
func (*testConstBalancer) Close() {
}
type testConstPicker struct {
err error
sc balancer.SubConn
}
func (tcp *testConstPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
if tcp.err != nil {
return balancer.PickResult{}, tcp.err
}
return balancer.PickResult{SubConn: tcp.sc}, nil
}
// Create XDS balancer, and update sub-balancer before handling eds responses.
// Then switch between round-robin and test-const-balancer after handling first
// eds response.
func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
cc := newTestClientConn(t)
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
@ -445,15 +405,15 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
for i := 0; i < 2; i++ {
sc := <-cc.newSubConnCh
sc := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc, connectivity.Ready)
}
p0 := <-cc.newPickerCh
p0 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
_, err := p0.Pick(balancer.PickInfo{})
if err != errTestConstPicker {
t.Fatalf("picker.Pick, got err %q, want err %q", err, errTestConstPicker)
if err != testutils.ErrTestConstPicker {
t.Fatalf("picker.Pick, got err %q, want err %q", err, testutils.ErrTestConstPicker)
}
}
@ -461,20 +421,20 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
edsb.HandleChildPolicy(roundrobin.Name, nil)
for i := 0; i < 2; i++ {
<-cc.removeSubConnCh
<-cc.RemoveSubConnCh
}
sc1 := <-cc.newSubConnCh
sc1 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)
sc2 := <-cc.newSubConnCh
sc2 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)
// Test roundrobin with two subconns.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc2}
if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
@ -482,24 +442,24 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
edsb.HandleChildPolicy("test-const-balancer", nil)
for i := 0; i < 2; i++ {
scToRemove := <-cc.removeSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testSubConn{})) &&
!cmp.Equal(scToRemove, sc2, cmp.AllowUnexported(testSubConn{})) {
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) &&
!cmp.Equal(scToRemove, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want (%v or %v), got %v", sc1, sc2, scToRemove)
}
edsb.HandleSubConnStateChange(scToRemove, connectivity.Shutdown)
}
for i := 0; i < 2; i++ {
sc := <-cc.newSubConnCh
sc := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc, connectivity.Ready)
}
p2 := <-cc.newPickerCh
p2 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
_, err := p2.Pick(balancer.PickInfo{})
if err != errTestConstPicker {
t.Fatalf("picker.Pick, got err %q, want err %q", err, errTestConstPicker)
if err != testutils.ErrTestConstPicker {
t.Fatalf("picker.Pick, got err %q, want err %q", err, testutils.ErrTestConstPicker)
}
}
@ -507,19 +467,19 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
edsb.HandleChildPolicy(roundrobin.Name, nil)
for i := 0; i < 2; i++ {
<-cc.removeSubConnCh
<-cc.RemoveSubConnCh
}
sc3 := <-cc.newSubConnCh
sc3 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc3, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc3, connectivity.Ready)
sc4 := <-cc.newSubConnCh
sc4 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc4, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc4, connectivity.Ready)
p3 := <-cc.newPickerCh
p3 := <-cc.NewPickerCh
want = []balancer.SubConn{sc3, sc4}
if err := isRoundRobin(want, subConnFromPicker(p3)); err != nil {
if err := testutils.IsRoundRobin(want, subConnFromPicker(p3)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
@ -552,7 +512,7 @@ var errTestInlineStateUpdate = fmt.Errorf("don't like addresses, empty or not")
func (tb *testInlineUpdateBalancer) HandleResolvedAddrs(a []resolver.Address, err error) {
tb.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &testConstPicker{err: errTestInlineStateUpdate},
Picker: &testutils.TestConstPicker{Err: errTestInlineStateUpdate},
})
}
@ -563,7 +523,7 @@ func (*testInlineUpdateBalancer) Close() {
// (e.g., roundrobin handling empty addresses). There could be deadlock caused
// by acquiring a locked mutex.
func (s) TestEDS_ChildPolicyUpdatePickerInline(t *testing.T) {
cc := newTestClientConn(t)
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, nil, nil)
edsb.enqueueChildBalancerStateUpdate = func(p priorityType, state balancer.State) {
// For this test, euqueue needs to happen asynchronously (like in the
@ -577,7 +537,7 @@ func (s) TestEDS_ChildPolicyUpdatePickerInline(t *testing.T) {
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
p0 := <-cc.newPickerCh
p0 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
_, err := p0.Pick(balancer.PickInfo{})
if err != errTestInlineStateUpdate {
@ -588,8 +548,8 @@ func (s) TestEDS_ChildPolicyUpdatePickerInline(t *testing.T) {
func (s) TestDropPicker(t *testing.T) {
const pickCount = 12
var constPicker = &testConstPicker{
sc: testSubConns[0],
var constPicker = &testutils.TestConstPicker{
SC: testutils.TestSubConns[0],
}
tests := []struct {
@ -652,9 +612,9 @@ func (s) TestDropPicker(t *testing.T) {
}
func (s) TestEDS_LoadReport(t *testing.T) {
testLoadStore := newTestLoadStore()
testLoadStore := testutils.NewTestLoadStore()
cc := newTestClientConn(t)
cc := testutils.NewTestClientConn(t)
edsb := newEDSBalancerImpl(cc, nil, testLoadStore, nil)
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
@ -664,7 +624,7 @@ func (s) TestEDS_LoadReport(t *testing.T) {
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
sc1 := <-cc.newSubConnCh
sc1 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)
backendToBalancerID[sc1] = internal.Locality{
@ -676,7 +636,7 @@ func (s) TestEDS_LoadReport(t *testing.T) {
// keep localities.
clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
sc2 := <-cc.newSubConnCh
sc2 := <-cc.NewSubConnCh
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)
backendToBalancerID[sc2] = internal.Locality{
@ -684,7 +644,7 @@ func (s) TestEDS_LoadReport(t *testing.T) {
}
// Test roundrobin with two subconns.
p1 := <-cc.newPickerCh
p1 := <-cc.NewPickerCh
var (
wantStart []internal.Locality
wantEnd []internal.Locality
@ -700,10 +660,10 @@ func (s) TestEDS_LoadReport(t *testing.T) {
}
}
if !cmp.Equal(testLoadStore.callsStarted, wantStart) {
t.Fatalf("want started: %v, got: %v", testLoadStore.callsStarted, wantStart)
if !cmp.Equal(testLoadStore.CallsStarted, wantStart) {
t.Fatalf("want started: %v, got: %v", testLoadStore.CallsStarted, wantStart)
}
if !cmp.Equal(testLoadStore.callsEnded, wantEnd) {
t.Fatalf("want ended: %v, got: %v", testLoadStore.callsEnded, wantEnd)
if !cmp.Equal(testLoadStore.CallsEnded, wantEnd) {
t.Fatalf("want ended: %v, got: %v", testLoadStore.CallsEnded, wantEnd)
}
}

View File

@ -55,6 +55,13 @@ func init() {
}
}
func subConnFromPicker(p balancer.V2Picker) func() balancer.SubConn {
return func() balancer.SubConn {
scst, _ := p.Pick(balancer.PickInfo{})
return scst.SubConn
}
}
type s struct {
grpctest.Tester
}

View File

@ -1,350 +0,0 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edsbalancer
import (
"context"
"fmt"
"testing"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
)
const testSubConnsCount = 16
var testSubConns []*testSubConn
func init() {
for i := 0; i < testSubConnsCount; i++ {
testSubConns = append(testSubConns, &testSubConn{
id: fmt.Sprintf("sc%d", i),
})
}
}
type testSubConn struct {
id string
}
func (tsc *testSubConn) UpdateAddresses([]resolver.Address) {
panic("not implemented")
}
func (tsc *testSubConn) Connect() {
}
// Implement stringer to get human friendly error message.
func (tsc *testSubConn) String() string {
return tsc.id
}
type testClientConn struct {
t *testing.T // For logging only.
newSubConnAddrsCh chan []resolver.Address // The last 10 []Address to create subconn.
newSubConnCh chan balancer.SubConn // The last 10 subconn created.
removeSubConnCh chan balancer.SubConn // The last 10 subconn removed.
newPickerCh chan balancer.V2Picker // The last picker updated.
newStateCh chan connectivity.State // The last state.
subConnIdx int
}
func newTestClientConn(t *testing.T) *testClientConn {
return &testClientConn{
t: t,
newSubConnAddrsCh: make(chan []resolver.Address, 10),
newSubConnCh: make(chan balancer.SubConn, 10),
removeSubConnCh: make(chan balancer.SubConn, 10),
newPickerCh: make(chan balancer.V2Picker, 1),
newStateCh: make(chan connectivity.State, 1),
}
}
func (tcc *testClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubConnOptions) (balancer.SubConn, error) {
sc := testSubConns[tcc.subConnIdx]
tcc.subConnIdx++
tcc.t.Logf("testClientConn: NewSubConn(%v, %+v) => %s", a, o, sc)
select {
case tcc.newSubConnAddrsCh <- a:
default:
}
select {
case tcc.newSubConnCh <- sc:
default:
}
return sc, nil
}
func (tcc *testClientConn) RemoveSubConn(sc balancer.SubConn) {
tcc.t.Logf("testClientCOnn: RemoveSubConn(%p)", sc)
select {
case tcc.removeSubConnCh <- sc:
default:
}
}
func (tcc *testClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
tcc.t.Fatal("not implemented")
}
func (tcc *testClientConn) UpdateState(bs balancer.State) {
tcc.t.Logf("testClientConn: UpdateState(%v)", bs)
select {
case <-tcc.newStateCh:
default:
}
tcc.newStateCh <- bs.ConnectivityState
select {
case <-tcc.newPickerCh:
default:
}
tcc.newPickerCh <- bs.Picker
}
func (tcc *testClientConn) ResolveNow(resolver.ResolveNowOptions) {
panic("not implemented")
}
func (tcc *testClientConn) Target() string {
panic("not implemented")
}
type testServerLoad struct {
name string
d float64
}
type testLoadStore struct {
callsStarted []internal.Locality
callsEnded []internal.Locality
callsCost []testServerLoad
}
func newTestLoadStore() *testLoadStore {
return &testLoadStore{}
}
func (*testLoadStore) CallDropped(category string) {
panic("not implemented")
}
func (tls *testLoadStore) CallStarted(l internal.Locality) {
tls.callsStarted = append(tls.callsStarted, l)
}
func (tls *testLoadStore) CallFinished(l internal.Locality, err error) {
tls.callsEnded = append(tls.callsEnded, l)
}
func (tls *testLoadStore) CallServerLoad(l internal.Locality, name string, d float64) {
tls.callsCost = append(tls.callsCost, testServerLoad{name: name, d: d})
}
func (*testLoadStore) ReportTo(ctx context.Context, cc *grpc.ClientConn, clusterName string, node *corepb.Node) {
panic("not implemented")
}
// isRoundRobin checks whether f's return value is roundrobin of elements from
// want. But it doesn't check for the order. Note that want can contain
// duplicate items, which makes it weight-round-robin.
//
// Step 1. the return values of f should form a permutation of all elements in
// want, but not necessary in the same order. E.g. if want is {a,a,b}, the check
// fails if f returns:
// - {a,a,a}: third a is returned before b
// - {a,b,b}: second b is returned before the second a
//
// If error is found in this step, the returned error contains only the first
// iteration until where it goes wrong.
//
// Step 2. the return values of f should be repetitions of the same permutation.
// E.g. if want is {a,a,b}, the check failes if f returns:
// - {a,b,a,b,a,a}: though it satisfies step 1, the second iteration is not
// repeating the first iteration.
//
// If error is found in this step, the returned error contains the first
// iteration + the second iteration until where it goes wrong.
func isRoundRobin(want []balancer.SubConn, f func() balancer.SubConn) error {
wantSet := make(map[balancer.SubConn]int) // SubConn -> count, for weighted RR.
for _, sc := range want {
wantSet[sc]++
}
// The first iteration: makes sure f's return values form a permutation of
// elements in want.
//
// Also keep the returns values in a slice, so we can compare the order in
// the second iteration.
gotSliceFirstIteration := make([]balancer.SubConn, 0, len(want))
for range want {
got := f()
gotSliceFirstIteration = append(gotSliceFirstIteration, got)
wantSet[got]--
if wantSet[got] < 0 {
return fmt.Errorf("non-roundrobin want: %v, result: %v", want, gotSliceFirstIteration)
}
}
// The second iteration should repeat the first iteration.
var gotSliceSecondIteration []balancer.SubConn
for i := 0; i < 2; i++ {
for _, w := range gotSliceFirstIteration {
g := f()
gotSliceSecondIteration = append(gotSliceSecondIteration, g)
if w != g {
return fmt.Errorf("non-roundrobin, first iter: %v, second iter: %v", gotSliceFirstIteration, gotSliceSecondIteration)
}
}
}
return nil
}
// testClosure is a test util for TestIsRoundRobin.
type testClosure struct {
r []balancer.SubConn
i int
}
func (tc *testClosure) next() balancer.SubConn {
ret := tc.r[tc.i]
tc.i = (tc.i + 1) % len(tc.r)
return ret
}
func (s) TestIsRoundRobin(t *testing.T) {
var (
sc1 = testSubConns[0]
sc2 = testSubConns[1]
sc3 = testSubConns[2]
)
testCases := []struct {
desc string
want []balancer.SubConn
got []balancer.SubConn
pass bool
}{
{
desc: "0 element",
want: []balancer.SubConn{},
got: []balancer.SubConn{},
pass: true,
},
{
desc: "1 element RR",
want: []balancer.SubConn{sc1},
got: []balancer.SubConn{sc1, sc1, sc1, sc1},
pass: true,
},
{
desc: "1 element not RR",
want: []balancer.SubConn{sc1},
got: []balancer.SubConn{sc1, sc2, sc1},
pass: false,
},
{
desc: "2 elements RR",
want: []balancer.SubConn{sc1, sc2},
got: []balancer.SubConn{sc1, sc2, sc1, sc2, sc1, sc2},
pass: true,
},
{
desc: "2 elements RR different order from want",
want: []balancer.SubConn{sc2, sc1},
got: []balancer.SubConn{sc1, sc2, sc1, sc2, sc1, sc2},
pass: true,
},
{
desc: "2 elements RR not RR, mistake in first iter",
want: []balancer.SubConn{sc1, sc2},
got: []balancer.SubConn{sc1, sc1, sc1, sc2, sc1, sc2},
pass: false,
},
{
desc: "2 elements RR not RR, mistake in second iter",
want: []balancer.SubConn{sc1, sc2},
got: []balancer.SubConn{sc1, sc2, sc1, sc1, sc1, sc2},
pass: false,
},
{
desc: "2 elements weighted RR",
want: []balancer.SubConn{sc1, sc1, sc2},
got: []balancer.SubConn{sc1, sc1, sc2, sc1, sc1, sc2},
pass: true,
},
{
desc: "2 elements weighted RR different order",
want: []balancer.SubConn{sc1, sc1, sc2},
got: []balancer.SubConn{sc1, sc2, sc1, sc1, sc2, sc1},
pass: true,
},
{
desc: "3 elements RR",
want: []balancer.SubConn{sc1, sc2, sc3},
got: []balancer.SubConn{sc1, sc2, sc3, sc1, sc2, sc3, sc1, sc2, sc3},
pass: true,
},
{
desc: "3 elements RR different order",
want: []balancer.SubConn{sc1, sc2, sc3},
got: []balancer.SubConn{sc3, sc2, sc1, sc3, sc2, sc1},
pass: true,
},
{
desc: "3 elements weighted RR",
want: []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc3},
got: []balancer.SubConn{sc1, sc2, sc3, sc1, sc2, sc1, sc1, sc2, sc3, sc1, sc2, sc1},
pass: true,
},
{
desc: "3 elements weighted RR not RR, mistake in first iter",
want: []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc3},
got: []balancer.SubConn{sc1, sc2, sc1, sc1, sc2, sc1, sc1, sc2, sc3, sc1, sc2, sc1},
pass: false,
},
{
desc: "3 elements weighted RR not RR, mistake in second iter",
want: []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc3},
got: []balancer.SubConn{sc1, sc2, sc3, sc1, sc2, sc1, sc1, sc1, sc3, sc1, sc2, sc1},
pass: false,
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
err := isRoundRobin(tC.want, (&testClosure{r: tC.got}).next)
if err == nil != tC.pass {
t.Errorf("want pass %v, want %v, got err %v", tC.pass, tC.want, err)
}
})
}
}

View File

@ -21,6 +21,8 @@ import (
xdsclient "google.golang.org/grpc/xds/internal/client"
)
var newRandomWRR = wrr.NewRandom
type dropper struct {
c xdsclient.OverloadDropConfig
w wrr.WRR

View File

@ -17,57 +17,14 @@
package edsbalancer
import (
"sync"
"testing"
"google.golang.org/grpc/internal/wrr"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/testutils"
)
// testWRR is a deterministic WRR implementation.
//
// The real implementation does random WRR. testWRR makes the balancer behavior
// deterministic and easier to test.
//
// With {a: 2, b: 3}, the Next() results will be {a, a, b, b, b}.
type testWRR struct {
itemsWithWeight []struct {
item interface{}
weight int64
}
length int
mu sync.Mutex
idx int // The index of the item that will be picked
count int64 // The number of times the current item has been picked.
}
func newTestWRR() wrr.WRR {
return &testWRR{}
}
func (twrr *testWRR) Add(item interface{}, weight int64) {
twrr.itemsWithWeight = append(twrr.itemsWithWeight, struct {
item interface{}
weight int64
}{item: item, weight: weight})
twrr.length++
}
func (twrr *testWRR) Next() interface{} {
twrr.mu.Lock()
iww := twrr.itemsWithWeight[twrr.idx]
twrr.count++
if twrr.count >= iww.weight {
twrr.idx = (twrr.idx + 1) % twrr.length
twrr.count = 0
}
twrr.mu.Unlock()
return iww.item
}
func init() {
newRandomWRR = newTestWRR
newRandomWRR = testutils.NewTestWRR
}
func (s) TestDropper(t *testing.T) {

View File

@ -0,0 +1,357 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package testutils
import (
"context"
"fmt"
"sync"
"testing"
envoy_api_v2_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
)
const testSubConnsCount = 16
// TestSubConns contains a list of SubConns to be used in tests.
var TestSubConns []*TestSubConn
func init() {
for i := 0; i < testSubConnsCount; i++ {
TestSubConns = append(TestSubConns, &TestSubConn{
id: fmt.Sprintf("sc%d", i),
})
}
}
// TestSubConn implements the SubConn interface, to be used in tests.
type TestSubConn struct {
id string
}
// UpdateAddresses panics.
func (tsc *TestSubConn) UpdateAddresses([]resolver.Address) { panic("not implemented") }
// Connect is a no-op.
func (tsc *TestSubConn) Connect() {}
// String implements stringer to print human friendly error message.
func (tsc *TestSubConn) String() string {
return tsc.id
}
// TestClientConn is a mock balancer.ClientConn used in tests.
type TestClientConn struct {
t *testing.T // For logging only.
NewSubConnAddrsCh chan []resolver.Address // the last 10 []Address to create subconn.
NewSubConnCh chan balancer.SubConn // the last 10 subconn created.
RemoveSubConnCh chan balancer.SubConn // the last 10 subconn removed.
NewPickerCh chan balancer.V2Picker // the last picker updated.
NewStateCh chan connectivity.State // the last state.
subConnIdx int
}
// NewTestClientConn creates a TestClientConn.
func NewTestClientConn(t *testing.T) *TestClientConn {
return &TestClientConn{
t: t,
NewSubConnAddrsCh: make(chan []resolver.Address, 10),
NewSubConnCh: make(chan balancer.SubConn, 10),
RemoveSubConnCh: make(chan balancer.SubConn, 10),
NewPickerCh: make(chan balancer.V2Picker, 1),
NewStateCh: make(chan connectivity.State, 1),
}
}
// NewSubConn creates a new SubConn.
func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubConnOptions) (balancer.SubConn, error) {
sc := TestSubConns[tcc.subConnIdx]
tcc.subConnIdx++
tcc.t.Logf("testClientConn: NewSubConn(%v, %+v) => %s", a, o, sc)
select {
case tcc.NewSubConnAddrsCh <- a:
default:
}
select {
case tcc.NewSubConnCh <- sc:
default:
}
return sc, nil
}
// RemoveSubConn removes the SubConn.
func (tcc *TestClientConn) RemoveSubConn(sc balancer.SubConn) {
tcc.t.Logf("testClientCOnn: RemoveSubConn(%p)", sc)
select {
case tcc.RemoveSubConnCh <- sc:
default:
}
}
// UpdateBalancerState implements balancer.Balancer API. It will be removed when
// switching to the new balancer interface.
func (tcc *TestClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
tcc.t.Fatal("not implemented")
}
// UpdateState updates connectivity state and picker.
func (tcc *TestClientConn) UpdateState(bs balancer.State) {
tcc.t.Logf("testClientConn: UpdateState(%v)", bs)
select {
case <-tcc.NewStateCh:
default:
}
tcc.NewStateCh <- bs.ConnectivityState
select {
case <-tcc.NewPickerCh:
default:
}
tcc.NewPickerCh <- bs.Picker
}
// ResolveNow panics.
func (tcc *TestClientConn) ResolveNow(resolver.ResolveNowOptions) {
panic("not implemented")
}
// Target panics.
func (tcc *TestClientConn) Target() string {
panic("not implemented")
}
// TestServerLoad is testing Load for testing LRS.
type TestServerLoad struct {
Name string
D float64
}
// TestLoadStore is a load store to be used in tests.
type TestLoadStore struct {
CallsStarted []internal.Locality
CallsEnded []internal.Locality
CallsCost []TestServerLoad
}
// NewTestLoadStore creates a new TestLoadStore.
func NewTestLoadStore() *TestLoadStore {
return &TestLoadStore{}
}
// CallDropped records a call dropped.
func (*TestLoadStore) CallDropped(category string) {
panic("not implemented")
}
// CallStarted records a call started.
func (tls *TestLoadStore) CallStarted(l internal.Locality) {
tls.CallsStarted = append(tls.CallsStarted, l)
}
// CallFinished records a call finished.
func (tls *TestLoadStore) CallFinished(l internal.Locality, err error) {
tls.CallsEnded = append(tls.CallsEnded, l)
}
// CallServerLoad records a call server load.
func (tls *TestLoadStore) CallServerLoad(l internal.Locality, name string, d float64) {
tls.CallsCost = append(tls.CallsCost, TestServerLoad{Name: name, D: d})
}
// ReportTo panics.
func (*TestLoadStore) ReportTo(ctx context.Context, cc *grpc.ClientConn, clusterName string, node *envoy_api_v2_core.Node) {
panic("not implemented")
}
// IsRoundRobin checks whether f's return value is roundrobin of elements from
// want. But it doesn't check for the order. Note that want can contain
// duplicate items, which makes it weight-round-robin.
//
// Step 1. the return values of f should form a permutation of all elements in
// want, but not necessary in the same order. E.g. if want is {a,a,b}, the check
// fails if f returns:
// - {a,a,a}: third a is returned before b
// - {a,b,b}: second b is returned before the second a
//
// If error is found in this step, the returned error contains only the first
// iteration until where it goes wrong.
//
// Step 2. the return values of f should be repetitions of the same permutation.
// E.g. if want is {a,a,b}, the check failes if f returns:
// - {a,b,a,b,a,a}: though it satisfies step 1, the second iteration is not
// repeating the first iteration.
//
// If error is found in this step, the returned error contains the first
// iteration + the second iteration until where it goes wrong.
func IsRoundRobin(want []balancer.SubConn, f func() balancer.SubConn) error {
wantSet := make(map[balancer.SubConn]int) // SubConn -> count, for weighted RR.
for _, sc := range want {
wantSet[sc]++
}
// The first iteration: makes sure f's return values form a permutation of
// elements in want.
//
// Also keep the returns values in a slice, so we can compare the order in
// the second iteration.
gotSliceFirstIteration := make([]balancer.SubConn, 0, len(want))
for range want {
got := f()
gotSliceFirstIteration = append(gotSliceFirstIteration, got)
wantSet[got]--
if wantSet[got] < 0 {
return fmt.Errorf("non-roundrobin want: %v, result: %v", want, gotSliceFirstIteration)
}
}
// The second iteration should repeat the first iteration.
var gotSliceSecondIteration []balancer.SubConn
for i := 0; i < 2; i++ {
for _, w := range gotSliceFirstIteration {
g := f()
gotSliceSecondIteration = append(gotSliceSecondIteration, g)
if w != g {
return fmt.Errorf("non-roundrobin, first iter: %v, second iter: %v", gotSliceFirstIteration, gotSliceSecondIteration)
}
}
}
return nil
}
// testClosure is a test util for TestIsRoundRobin.
type testClosure struct {
r []balancer.SubConn
i int
}
func (tc *testClosure) next() balancer.SubConn {
ret := tc.r[tc.i]
tc.i = (tc.i + 1) % len(tc.r)
return ret
}
func init() {
balancer.Register(&TestConstBalancerBuilder{})
}
// ErrTestConstPicker is error returned by test const picker.
var ErrTestConstPicker = fmt.Errorf("const picker error")
// TestConstBalancerBuilder is a balancer builder for tests.
type TestConstBalancerBuilder struct{}
// Build builds a test const balancer.
func (*TestConstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return &testConstBalancer{cc: cc}
}
// Name returns test-const-balancer name.
func (*TestConstBalancerBuilder) Name() string {
return "test-const-balancer"
}
type testConstBalancer struct {
cc balancer.ClientConn
}
func (tb *testConstBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
tb.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &TestConstPicker{Err: ErrTestConstPicker}})
}
func (tb *testConstBalancer) HandleResolvedAddrs(a []resolver.Address, err error) {
if len(a) == 0 {
return
}
tb.cc.NewSubConn(a, balancer.NewSubConnOptions{})
}
func (*testConstBalancer) Close() {
}
// TestConstPicker is a const picker for tests.
type TestConstPicker struct {
Err error
SC balancer.SubConn
}
// Pick returns the const SubConn or the error.
func (tcp *TestConstPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
if tcp.Err != nil {
return balancer.PickResult{}, tcp.Err
}
return balancer.PickResult{SubConn: tcp.SC}, nil
}
// testWRR is a deterministic WRR implementation.
//
// The real implementation does random WRR. testWRR makes the balancer behavior
// deterministic and easier to test.
//
// With {a: 2, b: 3}, the Next() results will be {a, a, b, b, b}.
type testWRR struct {
itemsWithWeight []struct {
item interface{}
weight int64
}
length int
mu sync.Mutex
idx int // The index of the item that will be picked
count int64 // The number of times the current item has been picked.
}
// NewTestWRR return a WRR for testing. It's deterministic instead random.
func NewTestWRR() wrr.WRR {
return &testWRR{}
}
func (twrr *testWRR) Add(item interface{}, weight int64) {
twrr.itemsWithWeight = append(twrr.itemsWithWeight, struct {
item interface{}
weight int64
}{item: item, weight: weight})
twrr.length++
}
func (twrr *testWRR) Next() interface{} {
twrr.mu.Lock()
iww := twrr.itemsWithWeight[twrr.idx]
twrr.count++
if twrr.count >= iww.weight {
twrr.idx = (twrr.idx + 1) % twrr.length
twrr.count = 0
}
twrr.mu.Unlock()
return iww.item
}

View File

@ -0,0 +1,134 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package testutils
import (
"testing"
"google.golang.org/grpc/balancer"
)
func TestIsRoundRobin(t *testing.T) {
var (
sc1 = TestSubConns[0]
sc2 = TestSubConns[1]
sc3 = TestSubConns[2]
)
testCases := []struct {
desc string
want []balancer.SubConn
got []balancer.SubConn
pass bool
}{
{
desc: "0 element",
want: []balancer.SubConn{},
got: []balancer.SubConn{},
pass: true,
},
{
desc: "1 element RR",
want: []balancer.SubConn{sc1},
got: []balancer.SubConn{sc1, sc1, sc1, sc1},
pass: true,
},
{
desc: "1 element not RR",
want: []balancer.SubConn{sc1},
got: []balancer.SubConn{sc1, sc2, sc1},
pass: false,
},
{
desc: "2 elements RR",
want: []balancer.SubConn{sc1, sc2},
got: []balancer.SubConn{sc1, sc2, sc1, sc2, sc1, sc2},
pass: true,
},
{
desc: "2 elements RR different order from want",
want: []balancer.SubConn{sc2, sc1},
got: []balancer.SubConn{sc1, sc2, sc1, sc2, sc1, sc2},
pass: true,
},
{
desc: "2 elements RR not RR, mistake in first iter",
want: []balancer.SubConn{sc1, sc2},
got: []balancer.SubConn{sc1, sc1, sc1, sc2, sc1, sc2},
pass: false,
},
{
desc: "2 elements RR not RR, mistake in second iter",
want: []balancer.SubConn{sc1, sc2},
got: []balancer.SubConn{sc1, sc2, sc1, sc1, sc1, sc2},
pass: false,
},
{
desc: "2 elements weighted RR",
want: []balancer.SubConn{sc1, sc1, sc2},
got: []balancer.SubConn{sc1, sc1, sc2, sc1, sc1, sc2},
pass: true,
},
{
desc: "2 elements weighted RR different order",
want: []balancer.SubConn{sc1, sc1, sc2},
got: []balancer.SubConn{sc1, sc2, sc1, sc1, sc2, sc1},
pass: true,
},
{
desc: "3 elements RR",
want: []balancer.SubConn{sc1, sc2, sc3},
got: []balancer.SubConn{sc1, sc2, sc3, sc1, sc2, sc3, sc1, sc2, sc3},
pass: true,
},
{
desc: "3 elements RR different order",
want: []balancer.SubConn{sc1, sc2, sc3},
got: []balancer.SubConn{sc3, sc2, sc1, sc3, sc2, sc1},
pass: true,
},
{
desc: "3 elements weighted RR",
want: []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc3},
got: []balancer.SubConn{sc1, sc2, sc3, sc1, sc2, sc1, sc1, sc2, sc3, sc1, sc2, sc1},
pass: true,
},
{
desc: "3 elements weighted RR not RR, mistake in first iter",
want: []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc3},
got: []balancer.SubConn{sc1, sc2, sc1, sc1, sc2, sc1, sc1, sc2, sc3, sc1, sc2, sc1},
pass: false,
},
{
desc: "3 elements weighted RR not RR, mistake in second iter",
want: []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc3},
got: []balancer.SubConn{sc1, sc2, sc3, sc1, sc2, sc1, sc1, sc1, sc3, sc1, sc2, sc1},
pass: false,
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
err := IsRoundRobin(tC.want, (&testClosure{r: tC.got}).next)
if err == nil != tC.pass {
t.Errorf("want pass %v, want %v, got err %v", tC.pass, tC.want, err)
}
})
}
}