From a906ca0441ceb1f7cd4f5c7de30b8e81ce2ff5e8 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 30 Apr 2020 14:24:06 -0700 Subject: [PATCH] balancergroup: fix connectivity state (#3585) - Stop sending empty update to sub-balancers at init time - At init, when only one sub-balancer reports transient failure, wait for other sub-balancers - When aggregating states, consider a sub-balancer turns Connecting from TransientFailure still in TransientFailure, so aggregated state doesn't stay connecting for a long time --- .../balancer/balancergroup/balancergroup.go | 83 ++++++++++++++----- .../balancergroup/balancergroup_test.go | 76 +++++++++++++++++ 2 files changed, 136 insertions(+), 23 deletions(-) diff --git a/xds/internal/balancer/balancergroup/balancergroup.go b/xds/internal/balancer/balancergroup/balancergroup.go index 44468c10..b08a5864 100644 --- a/xds/internal/balancer/balancergroup/balancergroup.go +++ b/xds/internal/balancer/balancergroup/balancergroup.go @@ -62,7 +62,12 @@ type subBalancerWithConfig struct { // The static part of sub-balancer. Keeps balancerBuilders and addresses. // To be used when restarting sub-balancer. builder balancer.Builder - ccState balancer.ClientConnState + // ccState is a cache of the addresses/balancer config, so when the balancer + // is restarted after close, it will get the previous update. It's a pointer + // and is set to nil at init, so when the balancer is built for the first + // time (not a restart), it won't receive an empty update. Note that this + // isn't reset to nil when the underlying balancer is closed. + ccState *balancer.ClientConnState // The dynamic part of sub-balancer. Only used when balancer group is // started. Gets cleared when sub-balancer is closed. balancer balancer.Balancer @@ -94,7 +99,9 @@ func (sbc *subBalancerWithConfig) startBalancer() { b := sbc.builder.Build(sbc, balancer.BuildOptions{}) sbc.group.logger.Infof("Created child policy %p of type %v", b, sbc.builder.Name()) sbc.balancer = b - b.UpdateClientConnState(sbc.ccState) + if sbc.ccState != nil { + b.UpdateClientConnState(*sbc.ccState) + } } func (sbc *subBalancerWithConfig) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { @@ -110,7 +117,7 @@ func (sbc *subBalancerWithConfig) updateSubConnState(sc balancer.SubConn, state } func (sbc *subBalancerWithConfig) updateClientConnState(s balancer.ClientConnState) error { - sbc.ccState = s + sbc.ccState = &s b := sbc.balancer if b == nil { // This sub-balancer was closed. This should never happen because @@ -149,12 +156,17 @@ func (sbc *subBalancerWithConfig) stopBalancer() { type pickerState struct { weight uint32 - picker balancer.Picker - state connectivity.State + state balancer.State + // stateToAggregate is the connectivity state used only for state + // aggregation. It could be different from state.ConnectivityState. For + // example when a sub-balancer transitions from TransientFailure to + // connecting, state.ConnectivityState is Connecting, but stateToAggregate + // is still TransientFailure. + stateToAggregate connectivity.State } func (s *pickerState) String() string { - return fmt.Sprintf("weight:%v,picker:%p,state:%v", s.weight, s.picker, s.state) + return fmt.Sprintf("weight:%v,picker:%p,state:%v,stateToAggregate:%v", s.weight, s.state.Picker, s.state.ConnectivityState, s.stateToAggregate) } // BalancerGroup takes a list of balancers, and make them into one balancer. @@ -288,10 +300,14 @@ func (bg *BalancerGroup) Add(id internal.LocalityID, weight uint32, builder bala bg.incomingMu.Lock() bg.idToPickerState[id] = &pickerState{ weight: weight, - // Start everything in IDLE. It's doesn't affect the overall state - // because we don't count IDLE when aggregating (as opposite to e.g. - // READY, 1 READY results in overall READY). - state: connectivity.Idle, + // Start everything in CONNECTING, so if one of the sub-balancers + // reports TransientFailure, the RPCs will still wait for the other + // sub-balancers. + state: balancer.State{ + ConnectivityState: connectivity.Connecting, + Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), + }, + stateToAggregate: connectivity.Connecting, } bg.incomingMu.Unlock() @@ -518,13 +534,18 @@ func (bg *BalancerGroup) updateBalancerState(id internal.LocalityID, state balan bg.logger.Warningf("balancer group: pickerState for %v not found when update picker/state", id) return } - newPicker := state.Picker if bg.loadStore != nil { // Only wrap the picker to do load reporting if loadStore was set. - newPicker = newLoadReportPicker(state.Picker, id, bg.loadStore) + state.Picker = newLoadReportPicker(state.Picker, id, bg.loadStore) } - pickerSt.picker = newPicker - pickerSt.state = state.ConnectivityState + if !(pickerSt.state.ConnectivityState == connectivity.TransientFailure && state.ConnectivityState == connectivity.Connecting) { + // If old state is TransientFailure, and new state is Connecting, don't + // update the state, to prevent the aggregated state from being always + // CONNECTING. Otherwise, stateToAggregate is the same as + // state.ConnectivityState. + pickerSt.stateToAggregate = state.ConnectivityState + } + pickerSt.state = state if bg.incomingStarted { bg.logger.Infof("Child pickers with weight: %+v", bg.idToPickerState) bg.cc.UpdateState(buildPickerAndState(bg.idToPickerState)) @@ -539,10 +560,13 @@ func (bg *BalancerGroup) Close() { bg.incomingStarted = false for _, pState := range bg.idToPickerState { - // Reset everything to IDLE but keep the entry in map (to keep the - // weight). - pState.picker = nil - pState.state = connectivity.Idle + // Reset everything to init state (Connecting) but keep the entry in + // map (to keep the weight). + pState.state = balancer.State{ + ConnectivityState: connectivity.Connecting, + Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), + } + pState.stateToAggregate = connectivity.Connecting } // Also remove all SubConns. @@ -570,7 +594,7 @@ func buildPickerAndState(m map[internal.LocalityID]*pickerState) balancer.State var readyN, connectingN int readyPickerWithWeights := make([]pickerState, 0, len(m)) for _, ps := range m { - switch ps.state { + switch ps.stateToAggregate { case connectivity.Ready: readyN++ readyPickerWithWeights = append(readyPickerWithWeights, *ps) @@ -587,10 +611,23 @@ func buildPickerAndState(m map[internal.LocalityID]*pickerState) balancer.State default: aggregatedState = connectivity.TransientFailure } - if aggregatedState == connectivity.TransientFailure { - return balancer.State{ConnectivityState: aggregatedState, Picker: base.NewErrPicker(balancer.ErrTransientFailure)} + + // Make sure picker's return error is consistent with the aggregatedState. + // + // TODO: This is true for balancers like weighted_target, but not for + // routing. For routing, we want to always build picker with all sub-pickers + // (not even ready sub-pickers), so even if the overall state is Ready, pick + // for certain RPCs can behave like Connecting or TransientFailure. + var picker balancer.Picker + switch aggregatedState { + case connectivity.TransientFailure: + picker = base.NewErrPicker(balancer.ErrTransientFailure) + case connectivity.Connecting: + picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable) + default: + picker = newPickerGroup(readyPickerWithWeights) } - return balancer.State{ConnectivityState: aggregatedState, Picker: newPickerGroup(readyPickerWithWeights)} + return balancer.State{ConnectivityState: aggregatedState, Picker: picker} } // NewRandomWRR is the WRR constructor used to pick sub-pickers from @@ -612,7 +649,7 @@ type pickerGroup struct { func newPickerGroup(readyPickerWithWeights []pickerState) *pickerGroup { w := NewRandomWRR() for _, ps := range readyPickerWithWeights { - w.Add(ps.picker, int64(ps.weight)) + w.Add(ps.state.Picker, int64(ps.weight)) } return &pickerGroup{ diff --git a/xds/internal/balancer/balancergroup/balancergroup_test.go b/xds/internal/balancer/balancergroup/balancergroup_test.go index 8c211c04..8fda50bc 100644 --- a/xds/internal/balancer/balancergroup/balancergroup_test.go +++ b/xds/internal/balancer/balancergroup/balancergroup_test.go @@ -23,6 +23,7 @@ import ( orcapb "github.com/cncf/udpa/go/udpa/data/orca/v1" "github.com/google/go-cmp/cmp" + "google.golang.org/grpc" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" @@ -33,6 +34,7 @@ import ( var ( rrBuilder = balancer.Get(roundrobin.Name) + pfBuilder = balancer.Get(grpc.PickFirstBalancerName) testBalancerIDs = []internal.LocalityID{{Region: "b1"}, {Region: "b2"}, {Region: "b3"}} testBackendAddrs []resolver.Address ) @@ -551,6 +553,80 @@ func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) { bg.Start() } +// Test that at init time, with two sub-balancers, if one sub-balancer reports +// transient_failure, the picks won't fail with transient_failure, and should +// instead wait for the other sub-balancer. +func (s) TestBalancerGroup_InitOneSubBalancerTransientFailure(t *testing.T) { + cc := testutils.NewTestClientConn(t) + bg := New(cc, nil, nil) + bg.Start() + + // Add two balancers to group and send one resolved address to both + // balancers. + bg.Add(testBalancerIDs[0], 1, rrBuilder) + bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}}) + sc1 := <-cc.NewSubConnCh + + bg.Add(testBalancerIDs[1], 1, rrBuilder) + bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}}) + <-cc.NewSubConnCh + + // Set one subconn to TransientFailure, this will trigger one sub-balancer + // to report transient failure. + bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + + p1 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + r, err := p1.Pick(balancer.PickInfo{}) + if err != balancer.ErrNoSubConnAvailable { + t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrNoSubConnAvailable, r, err) + } + } +} + +// Test that with two sub-balancers, both in transient_failure, if one turns +// connecting, the overall state stays in transient_failure, and all picks +// return transient failure error. +func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *testing.T) { + cc := testutils.NewTestClientConn(t) + bg := New(cc, nil, nil) + bg.Start() + + // Add two balancers to group and send one resolved address to both + // balancers. + bg.Add(testBalancerIDs[0], 1, pfBuilder) + bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}}) + sc1 := <-cc.NewSubConnCh + + bg.Add(testBalancerIDs[1], 1, pfBuilder) + bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}}) + sc2 := <-cc.NewSubConnCh + + // Set both subconn to TransientFailure, this will put both sub-balancers in + // transient failure. + bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + + p1 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + r, err := p1.Pick(balancer.PickInfo{}) + if err != balancer.ErrTransientFailure { + t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrTransientFailure, r, err) + } + } + + // Set one subconn to Connecting, it shouldn't change the overall state. + bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + + p2 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + r, err := p2.Pick(balancer.PickInfo{}) + if err != balancer.ErrTransientFailure { + t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrTransientFailure, r, err) + } + } +} + func replaceDefaultSubBalancerCloseTimeout(n time.Duration) func() { old := DefaultSubBalancerCloseTimeout DefaultSubBalancerCloseTimeout = n