balancergroup: fix connectivity state (#3585)
- Stop sending empty update to sub-balancers at init time - At init, when only one sub-balancer reports transient failure, wait for other sub-balancers - When aggregating states, consider a sub-balancer turns Connecting from TransientFailure still in TransientFailure, so aggregated state doesn't stay connecting for a long time
This commit is contained in:
@ -62,7 +62,12 @@ type subBalancerWithConfig struct {
|
||||
// The static part of sub-balancer. Keeps balancerBuilders and addresses.
|
||||
// To be used when restarting sub-balancer.
|
||||
builder balancer.Builder
|
||||
ccState balancer.ClientConnState
|
||||
// ccState is a cache of the addresses/balancer config, so when the balancer
|
||||
// is restarted after close, it will get the previous update. It's a pointer
|
||||
// and is set to nil at init, so when the balancer is built for the first
|
||||
// time (not a restart), it won't receive an empty update. Note that this
|
||||
// isn't reset to nil when the underlying balancer is closed.
|
||||
ccState *balancer.ClientConnState
|
||||
// The dynamic part of sub-balancer. Only used when balancer group is
|
||||
// started. Gets cleared when sub-balancer is closed.
|
||||
balancer balancer.Balancer
|
||||
@ -94,7 +99,9 @@ func (sbc *subBalancerWithConfig) startBalancer() {
|
||||
b := sbc.builder.Build(sbc, balancer.BuildOptions{})
|
||||
sbc.group.logger.Infof("Created child policy %p of type %v", b, sbc.builder.Name())
|
||||
sbc.balancer = b
|
||||
b.UpdateClientConnState(sbc.ccState)
|
||||
if sbc.ccState != nil {
|
||||
b.UpdateClientConnState(*sbc.ccState)
|
||||
}
|
||||
}
|
||||
|
||||
func (sbc *subBalancerWithConfig) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
|
||||
@ -110,7 +117,7 @@ func (sbc *subBalancerWithConfig) updateSubConnState(sc balancer.SubConn, state
|
||||
}
|
||||
|
||||
func (sbc *subBalancerWithConfig) updateClientConnState(s balancer.ClientConnState) error {
|
||||
sbc.ccState = s
|
||||
sbc.ccState = &s
|
||||
b := sbc.balancer
|
||||
if b == nil {
|
||||
// This sub-balancer was closed. This should never happen because
|
||||
@ -149,12 +156,17 @@ func (sbc *subBalancerWithConfig) stopBalancer() {
|
||||
|
||||
type pickerState struct {
|
||||
weight uint32
|
||||
picker balancer.Picker
|
||||
state connectivity.State
|
||||
state balancer.State
|
||||
// stateToAggregate is the connectivity state used only for state
|
||||
// aggregation. It could be different from state.ConnectivityState. For
|
||||
// example when a sub-balancer transitions from TransientFailure to
|
||||
// connecting, state.ConnectivityState is Connecting, but stateToAggregate
|
||||
// is still TransientFailure.
|
||||
stateToAggregate connectivity.State
|
||||
}
|
||||
|
||||
func (s *pickerState) String() string {
|
||||
return fmt.Sprintf("weight:%v,picker:%p,state:%v", s.weight, s.picker, s.state)
|
||||
return fmt.Sprintf("weight:%v,picker:%p,state:%v,stateToAggregate:%v", s.weight, s.state.Picker, s.state.ConnectivityState, s.stateToAggregate)
|
||||
}
|
||||
|
||||
// BalancerGroup takes a list of balancers, and make them into one balancer.
|
||||
@ -288,10 +300,14 @@ func (bg *BalancerGroup) Add(id internal.LocalityID, weight uint32, builder bala
|
||||
bg.incomingMu.Lock()
|
||||
bg.idToPickerState[id] = &pickerState{
|
||||
weight: weight,
|
||||
// Start everything in IDLE. It's doesn't affect the overall state
|
||||
// because we don't count IDLE when aggregating (as opposite to e.g.
|
||||
// READY, 1 READY results in overall READY).
|
||||
state: connectivity.Idle,
|
||||
// Start everything in CONNECTING, so if one of the sub-balancers
|
||||
// reports TransientFailure, the RPCs will still wait for the other
|
||||
// sub-balancers.
|
||||
state: balancer.State{
|
||||
ConnectivityState: connectivity.Connecting,
|
||||
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
|
||||
},
|
||||
stateToAggregate: connectivity.Connecting,
|
||||
}
|
||||
bg.incomingMu.Unlock()
|
||||
|
||||
@ -518,13 +534,18 @@ func (bg *BalancerGroup) updateBalancerState(id internal.LocalityID, state balan
|
||||
bg.logger.Warningf("balancer group: pickerState for %v not found when update picker/state", id)
|
||||
return
|
||||
}
|
||||
newPicker := state.Picker
|
||||
if bg.loadStore != nil {
|
||||
// Only wrap the picker to do load reporting if loadStore was set.
|
||||
newPicker = newLoadReportPicker(state.Picker, id, bg.loadStore)
|
||||
state.Picker = newLoadReportPicker(state.Picker, id, bg.loadStore)
|
||||
}
|
||||
pickerSt.picker = newPicker
|
||||
pickerSt.state = state.ConnectivityState
|
||||
if !(pickerSt.state.ConnectivityState == connectivity.TransientFailure && state.ConnectivityState == connectivity.Connecting) {
|
||||
// If old state is TransientFailure, and new state is Connecting, don't
|
||||
// update the state, to prevent the aggregated state from being always
|
||||
// CONNECTING. Otherwise, stateToAggregate is the same as
|
||||
// state.ConnectivityState.
|
||||
pickerSt.stateToAggregate = state.ConnectivityState
|
||||
}
|
||||
pickerSt.state = state
|
||||
if bg.incomingStarted {
|
||||
bg.logger.Infof("Child pickers with weight: %+v", bg.idToPickerState)
|
||||
bg.cc.UpdateState(buildPickerAndState(bg.idToPickerState))
|
||||
@ -539,10 +560,13 @@ func (bg *BalancerGroup) Close() {
|
||||
bg.incomingStarted = false
|
||||
|
||||
for _, pState := range bg.idToPickerState {
|
||||
// Reset everything to IDLE but keep the entry in map (to keep the
|
||||
// weight).
|
||||
pState.picker = nil
|
||||
pState.state = connectivity.Idle
|
||||
// Reset everything to init state (Connecting) but keep the entry in
|
||||
// map (to keep the weight).
|
||||
pState.state = balancer.State{
|
||||
ConnectivityState: connectivity.Connecting,
|
||||
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
|
||||
}
|
||||
pState.stateToAggregate = connectivity.Connecting
|
||||
}
|
||||
|
||||
// Also remove all SubConns.
|
||||
@ -570,7 +594,7 @@ func buildPickerAndState(m map[internal.LocalityID]*pickerState) balancer.State
|
||||
var readyN, connectingN int
|
||||
readyPickerWithWeights := make([]pickerState, 0, len(m))
|
||||
for _, ps := range m {
|
||||
switch ps.state {
|
||||
switch ps.stateToAggregate {
|
||||
case connectivity.Ready:
|
||||
readyN++
|
||||
readyPickerWithWeights = append(readyPickerWithWeights, *ps)
|
||||
@ -587,10 +611,23 @@ func buildPickerAndState(m map[internal.LocalityID]*pickerState) balancer.State
|
||||
default:
|
||||
aggregatedState = connectivity.TransientFailure
|
||||
}
|
||||
if aggregatedState == connectivity.TransientFailure {
|
||||
return balancer.State{ConnectivityState: aggregatedState, Picker: base.NewErrPicker(balancer.ErrTransientFailure)}
|
||||
|
||||
// Make sure picker's return error is consistent with the aggregatedState.
|
||||
//
|
||||
// TODO: This is true for balancers like weighted_target, but not for
|
||||
// routing. For routing, we want to always build picker with all sub-pickers
|
||||
// (not even ready sub-pickers), so even if the overall state is Ready, pick
|
||||
// for certain RPCs can behave like Connecting or TransientFailure.
|
||||
var picker balancer.Picker
|
||||
switch aggregatedState {
|
||||
case connectivity.TransientFailure:
|
||||
picker = base.NewErrPicker(balancer.ErrTransientFailure)
|
||||
case connectivity.Connecting:
|
||||
picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable)
|
||||
default:
|
||||
picker = newPickerGroup(readyPickerWithWeights)
|
||||
}
|
||||
return balancer.State{ConnectivityState: aggregatedState, Picker: newPickerGroup(readyPickerWithWeights)}
|
||||
return balancer.State{ConnectivityState: aggregatedState, Picker: picker}
|
||||
}
|
||||
|
||||
// NewRandomWRR is the WRR constructor used to pick sub-pickers from
|
||||
@ -612,7 +649,7 @@ type pickerGroup struct {
|
||||
func newPickerGroup(readyPickerWithWeights []pickerState) *pickerGroup {
|
||||
w := NewRandomWRR()
|
||||
for _, ps := range readyPickerWithWeights {
|
||||
w.Add(ps.picker, int64(ps.weight))
|
||||
w.Add(ps.state.Picker, int64(ps.weight))
|
||||
}
|
||||
|
||||
return &pickerGroup{
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
|
||||
orcapb "github.com/cncf/udpa/go/udpa/data/orca/v1"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/balancer/roundrobin"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
@ -33,6 +34,7 @@ import (
|
||||
|
||||
var (
|
||||
rrBuilder = balancer.Get(roundrobin.Name)
|
||||
pfBuilder = balancer.Get(grpc.PickFirstBalancerName)
|
||||
testBalancerIDs = []internal.LocalityID{{Region: "b1"}, {Region: "b2"}, {Region: "b3"}}
|
||||
testBackendAddrs []resolver.Address
|
||||
)
|
||||
@ -551,6 +553,80 @@ func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
|
||||
bg.Start()
|
||||
}
|
||||
|
||||
// Test that at init time, with two sub-balancers, if one sub-balancer reports
|
||||
// transient_failure, the picks won't fail with transient_failure, and should
|
||||
// instead wait for the other sub-balancer.
|
||||
func (s) TestBalancerGroup_InitOneSubBalancerTransientFailure(t *testing.T) {
|
||||
cc := testutils.NewTestClientConn(t)
|
||||
bg := New(cc, nil, nil)
|
||||
bg.Start()
|
||||
|
||||
// Add two balancers to group and send one resolved address to both
|
||||
// balancers.
|
||||
bg.Add(testBalancerIDs[0], 1, rrBuilder)
|
||||
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
|
||||
sc1 := <-cc.NewSubConnCh
|
||||
|
||||
bg.Add(testBalancerIDs[1], 1, rrBuilder)
|
||||
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
|
||||
<-cc.NewSubConnCh
|
||||
|
||||
// Set one subconn to TransientFailure, this will trigger one sub-balancer
|
||||
// to report transient failure.
|
||||
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
|
||||
|
||||
p1 := <-cc.NewPickerCh
|
||||
for i := 0; i < 5; i++ {
|
||||
r, err := p1.Pick(balancer.PickInfo{})
|
||||
if err != balancer.ErrNoSubConnAvailable {
|
||||
t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrNoSubConnAvailable, r, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test that with two sub-balancers, both in transient_failure, if one turns
|
||||
// connecting, the overall state stays in transient_failure, and all picks
|
||||
// return transient failure error.
|
||||
func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *testing.T) {
|
||||
cc := testutils.NewTestClientConn(t)
|
||||
bg := New(cc, nil, nil)
|
||||
bg.Start()
|
||||
|
||||
// Add two balancers to group and send one resolved address to both
|
||||
// balancers.
|
||||
bg.Add(testBalancerIDs[0], 1, pfBuilder)
|
||||
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
|
||||
sc1 := <-cc.NewSubConnCh
|
||||
|
||||
bg.Add(testBalancerIDs[1], 1, pfBuilder)
|
||||
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
|
||||
sc2 := <-cc.NewSubConnCh
|
||||
|
||||
// Set both subconn to TransientFailure, this will put both sub-balancers in
|
||||
// transient failure.
|
||||
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
|
||||
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
|
||||
|
||||
p1 := <-cc.NewPickerCh
|
||||
for i := 0; i < 5; i++ {
|
||||
r, err := p1.Pick(balancer.PickInfo{})
|
||||
if err != balancer.ErrTransientFailure {
|
||||
t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrTransientFailure, r, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Set one subconn to Connecting, it shouldn't change the overall state.
|
||||
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
|
||||
|
||||
p2 := <-cc.NewPickerCh
|
||||
for i := 0; i < 5; i++ {
|
||||
r, err := p2.Pick(balancer.PickInfo{})
|
||||
if err != balancer.ErrTransientFailure {
|
||||
t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrTransientFailure, r, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func replaceDefaultSubBalancerCloseTimeout(n time.Duration) func() {
|
||||
old := DefaultSubBalancerCloseTimeout
|
||||
DefaultSubBalancerCloseTimeout = n
|
||||
|
Reference in New Issue
Block a user