balancergroup: move balancer state (connectivity state and picker) building out (#3685)

This commit is contained in:
Menghan Li
2020-07-08 17:41:40 -07:00
committed by GitHub
parent 4258d12073
commit 9af290fac4
9 changed files with 509 additions and 311 deletions

View File

@ -25,11 +25,9 @@ import (
orcapb "github.com/cncf/udpa/go/udpa/data/orca/v1"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/cache"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/lrs"
@ -154,21 +152,6 @@ func (sbc *subBalancerWrapper) stopBalancer() {
sbc.balancer = nil
}
type pickerState struct {
weight uint32
state balancer.State
// stateToAggregate is the connectivity state used only for state
// aggregation. It could be different from state.ConnectivityState. For
// example when a sub-balancer transitions from TransientFailure to
// connecting, state.ConnectivityState is Connecting, but stateToAggregate
// is still TransientFailure.
stateToAggregate connectivity.State
}
func (s *pickerState) String() string {
return fmt.Sprintf("weight:%v,picker:%p,state:%v,stateToAggregate:%v", s.weight, s.state.Picker, s.state.ConnectivityState, s.stateToAggregate)
}
// BalancerGroup takes a list of balancers, and make them into one balancer.
//
// Note that this struct doesn't implement balancer.Balancer, because it's not
@ -185,8 +168,9 @@ func (s *pickerState) String() string {
// Actions from sub-balances are forwarded to parent ClientConn
// - new/remove SubConn
// - picker update and health states change
// - sub-pickers are grouped into a group-picker
// - aggregated connectivity state is the overall state of all pickers.
// - sub-pickers are sent to an aggregator provided by the parent, which
// will group them into a group-picker. The aggregated connectivity state is
// also handled by the aggregator.
// - resolveNow
//
// Sub-balancers are only built when the balancer group is started. If the
@ -198,6 +182,11 @@ type BalancerGroup struct {
logger *grpclog.PrefixLogger
loadStore lrs.Store
// stateAggregator is where the state/picker updates will be sent to. It's
// provided by the parent balancer, to build a picker with all the
// sub-pickers.
stateAggregator BalancerStateAggregator
// outgoingMu guards all operations in the direction:
// ClientConn-->Sub-balancer. Including start, stop, resolver updates and
// SubConn state changes.
@ -210,8 +199,8 @@ type BalancerGroup struct {
// Cache for sub-balancers when they are removed.
balancerCache *cache.TimeoutCache
// incomingMu and pickerMu are to make sure this balancer group doesn't send
// updates to cc after it's closed.
// incomingMu is to make sure this balancer group doesn't send updates to cc
// after it's closed.
//
// We don't share the mutex to avoid deadlocks (e.g. a call to sub-balancer
// may call back to balancer group inline. It causes deaclock if they
@ -220,25 +209,22 @@ type BalancerGroup struct {
// We should never need to hold multiple locks at the same time in this
// struct. The case where two locks are held can only happen when the
// underlying balancer calls back into balancer group inline. So there's an
// implicit lock acquisition order that outgoingMu is locked before either
// incomingMu or pickerMu.
// implicit lock acquisition order that outgoingMu is locked before
// incomingMu.
// incomingMu guards all operations in the direction:
// Sub-balancer-->ClientConn. Including NewSubConn, RemoveSubConn, and
// updatePicker. It also guards the map from SubConn to balancer ID, so
// updateSubConnState needs to hold it shortly to find the
// sub-balancer to forward the update.
// Sub-balancer-->ClientConn. Including NewSubConn, RemoveSubConn. It also
// guards the map from SubConn to balancer ID, so updateSubConnState needs
// to hold it shortly to find the sub-balancer to forward the update.
//
// UpdateState is called by the balancer state aggretator, and it will
// decide when and whether to call.
//
// The corresponding boolean incomingStarted is used to stop further updates
// from sub-balancers after they are closed.
incomingMu sync.Mutex
incomingStarted bool // This boolean only guards calls back to ClientConn.
scToSubBalancer map[balancer.SubConn]*subBalancerWrapper
// All balancer IDs exist as keys in this map, even if balancer group is not
// started.
//
// If an ID is not in map, it's either removed or never added.
idToPickerState map[internal.LocalityID]*pickerState
}
// DefaultSubBalancerCloseTimeout is defined as a variable instead of const for
@ -249,16 +235,17 @@ var DefaultSubBalancerCloseTimeout = 15 * time.Minute
// New creates a new BalancerGroup. Note that the BalancerGroup
// needs to be started to work.
func New(cc balancer.ClientConn, loadStore lrs.Store, logger *grpclog.PrefixLogger) *BalancerGroup {
func New(cc balancer.ClientConn, stateAggregator BalancerStateAggregator, loadStore lrs.Store, logger *grpclog.PrefixLogger) *BalancerGroup {
return &BalancerGroup{
cc: cc,
logger: logger,
loadStore: loadStore,
stateAggregator: stateAggregator,
idToBalancerConfig: make(map[internal.LocalityID]*subBalancerWrapper),
balancerCache: cache.NewTimeoutCache(DefaultSubBalancerCloseTimeout),
scToSubBalancer: make(map[balancer.SubConn]*subBalancerWrapper),
idToPickerState: make(map[internal.LocalityID]*pickerState),
}
}
@ -286,31 +273,8 @@ func (bg *BalancerGroup) Start() {
bg.outgoingMu.Unlock()
}
// Add adds a balancer built by builder to the group, with given id and weight.
//
// weight should never be zero.
func (bg *BalancerGroup) Add(id internal.LocalityID, weight uint32, builder balancer.Builder) {
if weight == 0 {
bg.logger.Errorf("BalancerGroup.add called with weight 0, locality: %v. Locality is not added to balancer group", id)
return
}
// First, add things to the picker map. Do this even if incomingStarted is
// false, because the data is static.
bg.incomingMu.Lock()
bg.idToPickerState[id] = &pickerState{
weight: weight,
// Start everything in CONNECTING, so if one of the sub-balancers
// reports TransientFailure, the RPCs will still wait for the other
// sub-balancers.
state: balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
},
stateToAggregate: connectivity.Connecting,
}
bg.incomingMu.Unlock()
// Add adds a balancer built by builder to the group, with given id.
func (bg *BalancerGroup) Add(id internal.LocalityID, builder balancer.Builder) {
// Store data in static map, and then check to see if bg is started.
bg.outgoingMu.Lock()
var sbc *subBalancerWrapper
@ -361,9 +325,6 @@ func (bg *BalancerGroup) Add(id internal.LocalityID, weight uint32, builder bala
// But doesn't close the balancer. The balancer is kept in a cache, and will be
// closed after timeout. Cleanup work (closing sub-balancer and removing
// subconns) will be done after timeout.
//
// It also removes the picker generated from this balancer from the picker
// group. It always results in a picker update.
func (bg *BalancerGroup) Remove(id internal.LocalityID) {
bg.outgoingMu.Lock()
if sbToRemove, ok := bg.idToBalancerConfig[id]; ok {
@ -385,18 +346,6 @@ func (bg *BalancerGroup) Remove(id internal.LocalityID) {
bg.logger.Infof("balancer group: trying to remove a non-existing locality from balancer group: %v", id)
}
bg.outgoingMu.Unlock()
bg.incomingMu.Lock()
// Remove id and picker from picker map. This also results in future updates
// for this ID to be ignored.
delete(bg.idToPickerState, id)
if bg.incomingStarted {
// Normally picker update is triggered by SubConn state change. But we
// want to update state and picker to reflect the changes, too. Because
// we don't want `ClientConn` to pick this sub-balancer anymore.
bg.cc.UpdateState(buildPickerAndState(bg.idToPickerState))
}
bg.incomingMu.Unlock()
}
// bg.remove(id) doesn't do cleanup for the sub-balancer. This function does
@ -420,36 +369,6 @@ func (bg *BalancerGroup) cleanupSubConns(config *subBalancerWrapper) {
bg.incomingMu.Unlock()
}
// ChangeWeight changes the weight of the balancer.
//
// newWeight should never be zero.
//
// NOTE: It always results in a picker update now. This probably isn't
// necessary. But it seems better to do the update because it's a change in the
// picker (which is balancer's snapshot).
func (bg *BalancerGroup) ChangeWeight(id internal.LocalityID, newWeight uint32) {
if newWeight == 0 {
bg.logger.Errorf("BalancerGroup.changeWeight called with newWeight 0. Weight is not changed")
return
}
bg.incomingMu.Lock()
defer bg.incomingMu.Unlock()
pState, ok := bg.idToPickerState[id]
if !ok {
return
}
if pState.weight == newWeight {
return
}
pState.weight = newWeight
if bg.incomingStarted {
// Normally picker update is triggered by SubConn state change. But we
// want to update state and picker to reflect the changes, too. Because
// `ClientConn` should do pick with the new weights now.
bg.cc.UpdateState(buildPickerAndState(bg.idToPickerState))
}
}
// Following are actions from the parent grpc.ClientConn, forward to sub-balancers.
// UpdateSubConnState handles the state for the subconn. It finds the
@ -520,35 +439,22 @@ func (bg *BalancerGroup) newSubConn(config *subBalancerWrapper, addrs []resolver
return sc, nil
}
// updateBalancerState: create an aggregated picker and an aggregated
// connectivity state, then forward to ClientConn.
// updateBalancerState: forward the new state to balancer state aggregator. The
// aggregator will create an aggregated picker and an aggregated connectivity
// state, then forward to ClientConn.
func (bg *BalancerGroup) updateBalancerState(id internal.LocalityID, state balancer.State) {
bg.logger.Infof("Balancer state update from locality %v, new state: %+v", id, state)
bg.incomingMu.Lock()
defer bg.incomingMu.Unlock()
pickerSt, ok := bg.idToPickerState[id]
if !ok {
// All state starts in IDLE. If ID is not in map, it's either removed,
// or never existed.
bg.logger.Warningf("balancer group: pickerState for %v not found when update picker/state", id)
return
}
if bg.loadStore != nil {
// Only wrap the picker to do load reporting if loadStore was set.
state.Picker = newLoadReportPicker(state.Picker, id, bg.loadStore)
}
if !(pickerSt.state.ConnectivityState == connectivity.TransientFailure && state.ConnectivityState == connectivity.Connecting) {
// If old state is TransientFailure, and new state is Connecting, don't
// update the state, to prevent the aggregated state from being always
// CONNECTING. Otherwise, stateToAggregate is the same as
// state.ConnectivityState.
pickerSt.stateToAggregate = state.ConnectivityState
}
pickerSt.state = state
if bg.incomingStarted {
bg.logger.Infof("Child pickers with weight: %+v", bg.idToPickerState)
bg.cc.UpdateState(buildPickerAndState(bg.idToPickerState))
// Send new state to the aggregator, without holding the incomingMu.
// incomingMu is to protect all calls to the parent ClientConn, this update
// doesn't necessary trigger a call to ClientConn, and should already be
// protected by aggregator's mutex if necessary.
if bg.stateAggregator != nil {
bg.stateAggregator.UpdateState(id, state)
}
}
@ -558,17 +464,6 @@ func (bg *BalancerGroup) Close() {
bg.incomingMu.Lock()
if bg.incomingStarted {
bg.incomingStarted = false
for _, pState := range bg.idToPickerState {
// Reset everything to init state (Connecting) but keep the entry in
// map (to keep the weight).
pState.state = balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
}
pState.stateToAggregate = connectivity.Connecting
}
// Also remove all SubConns.
for sc := range bg.scToSubBalancer {
bg.cc.RemoveSubConn(sc)
@ -590,82 +485,6 @@ func (bg *BalancerGroup) Close() {
bg.balancerCache.Clear(true)
}
func buildPickerAndState(m map[internal.LocalityID]*pickerState) balancer.State {
var readyN, connectingN int
readyPickerWithWeights := make([]pickerState, 0, len(m))
for _, ps := range m {
switch ps.stateToAggregate {
case connectivity.Ready:
readyN++
readyPickerWithWeights = append(readyPickerWithWeights, *ps)
case connectivity.Connecting:
connectingN++
}
}
var aggregatedState connectivity.State
switch {
case readyN > 0:
aggregatedState = connectivity.Ready
case connectingN > 0:
aggregatedState = connectivity.Connecting
default:
aggregatedState = connectivity.TransientFailure
}
// Make sure picker's return error is consistent with the aggregatedState.
//
// TODO: This is true for balancers like weighted_target, but not for
// routing. For routing, we want to always build picker with all sub-pickers
// (not even ready sub-pickers), so even if the overall state is Ready, pick
// for certain RPCs can behave like Connecting or TransientFailure.
var picker balancer.Picker
switch aggregatedState {
case connectivity.TransientFailure:
picker = base.NewErrPicker(balancer.ErrTransientFailure)
case connectivity.Connecting:
picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable)
default:
picker = newPickerGroup(readyPickerWithWeights)
}
return balancer.State{ConnectivityState: aggregatedState, Picker: picker}
}
// NewRandomWRR is the WRR constructor used to pick sub-pickers from
// sub-balancers. It's to be modified in tests.
var NewRandomWRR = wrr.NewRandom
type pickerGroup struct {
length int
w wrr.WRR
}
// newPickerGroup takes pickers with weights, and group them into one picker.
//
// Note it only takes ready pickers. The map shouldn't contain non-ready
// pickers.
//
// TODO: (bg) confirm this is the expected behavior: non-ready balancers should
// be ignored when picking. Only ready balancers are picked.
func newPickerGroup(readyPickerWithWeights []pickerState) *pickerGroup {
w := NewRandomWRR()
for _, ps := range readyPickerWithWeights {
w.Add(ps.state.Picker, int64(ps.weight))
}
return &pickerGroup{
length: len(readyPickerWithWeights),
w: w,
}
}
func (pg *pickerGroup) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
if pg.length <= 0 {
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
p := pg.w.Next().(balancer.Picker)
return p.Pick(info)
}
const (
serverLoadCPUName = "cpu_utilization"
serverLoadMemoryName = "mem_utilization"

View File

@ -14,6 +14,15 @@
* limitations under the License.
*/
// All tests in this file are combination of balancer group and
// weighted_balancerstate_aggregator, aka weighted_target tests. The difference
// is weighted_target tests cannot add sub-balancers to balancer group directly,
// they instead uses balancer config to control sub-balancers. Even though not
// very suited, the tests still cover all the functionality.
//
// TODO: the tests should be moved to weighted_target, and balancer group's
// tests should use a mock balancerstate_aggregator.
package balancergroup
import (
@ -29,6 +38,8 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/lrs"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
"google.golang.org/grpc/xds/internal/testutils"
)
@ -58,14 +69,22 @@ func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
}
}
func newTestBalancerGroup(t *testing.T, loadStore lrs.Store) (*testutils.TestClientConn, *weightedaggregator.Aggregator, *BalancerGroup) {
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, gator, loadStore, nil)
bg.Start()
return cc, gator, bg
}
// 1 balancer, 1 backend -> 2 backends -> 1 backend.
func (s) TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) {
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
bg.Start()
cc, gator, bg := newTestBalancerGroup(t, nil)
// Add one balancer to group.
bg.Add(testBalancerIDs[0], 1, rrBuilder)
gator.Add(testBalancerIDs[0], 1)
bg.Add(testBalancerIDs[0], rrBuilder)
// Send one resolved address.
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
@ -117,17 +136,17 @@ func (s) TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) {
// 2 balancers, each with 1 backend.
func (s) TestBalancerGroup_TwoRR_OneBackend(t *testing.T) {
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
bg.Start()
cc, gator, bg := newTestBalancerGroup(t, nil)
// Add two balancers to group and send one resolved address to both
// balancers.
bg.Add(testBalancerIDs[0], 1, rrBuilder)
gator.Add(testBalancerIDs[0], 1)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
sc1 := <-cc.NewSubConnCh
bg.Add(testBalancerIDs[1], 1, rrBuilder)
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
sc2 := <-cc.NewSubConnCh
@ -147,18 +166,18 @@ func (s) TestBalancerGroup_TwoRR_OneBackend(t *testing.T) {
// 2 balancers, each with more than 1 backends.
func (s) TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) {
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
bg.Start()
cc, gator, bg := newTestBalancerGroup(t, nil)
// Add two balancers to group and send one resolved address to both
// balancers.
bg.Add(testBalancerIDs[0], 1, rrBuilder)
gator.Add(testBalancerIDs[0], 1)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
sc1 := <-cc.NewSubConnCh
sc2 := <-cc.NewSubConnCh
bg.Add(testBalancerIDs[1], 1, rrBuilder)
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
sc3 := <-cc.NewSubConnCh
sc4 := <-cc.NewSubConnCh
@ -232,18 +251,18 @@ func (s) TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) {
// 2 balancers with different weights.
func (s) TestBalancerGroup_TwoRR_DifferentWeight_MoreBackends(t *testing.T) {
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
bg.Start()
cc, gator, bg := newTestBalancerGroup(t, nil)
// Add two balancers to group and send two resolved addresses to both
// balancers.
bg.Add(testBalancerIDs[0], 2, rrBuilder)
gator.Add(testBalancerIDs[0], 2)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
sc1 := <-cc.NewSubConnCh
sc2 := <-cc.NewSubConnCh
bg.Add(testBalancerIDs[1], 1, rrBuilder)
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
sc3 := <-cc.NewSubConnCh
sc4 := <-cc.NewSubConnCh
@ -268,21 +287,22 @@ func (s) TestBalancerGroup_TwoRR_DifferentWeight_MoreBackends(t *testing.T) {
// totally 3 balancers, add/remove balancer.
func (s) TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) {
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
bg.Start()
cc, gator, bg := newTestBalancerGroup(t, nil)
// Add three balancers to group and send one resolved address to both
// balancers.
bg.Add(testBalancerIDs[0], 1, rrBuilder)
gator.Add(testBalancerIDs[0], 1)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
sc1 := <-cc.NewSubConnCh
bg.Add(testBalancerIDs[1], 1, rrBuilder)
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[1:2]}})
sc2 := <-cc.NewSubConnCh
bg.Add(testBalancerIDs[2], 1, rrBuilder)
gator.Add(testBalancerIDs[2], 1)
bg.Add(testBalancerIDs[2], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[2], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[1:2]}})
sc3 := <-cc.NewSubConnCh
@ -301,7 +321,9 @@ func (s) TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) {
}
// Remove the second balancer, while the others two are ready.
gator.Remove(testBalancerIDs[1])
bg.Remove(testBalancerIDs[1])
gator.BuildAndUpdate()
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove)
@ -315,7 +337,9 @@ func (s) TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) {
// move balancer 3 into transient failure.
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
// Remove the first balancer, while the third is transient failure.
gator.Remove(testBalancerIDs[0])
bg.Remove(testBalancerIDs[0])
gator.BuildAndUpdate()
scToRemove = <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
@ -330,18 +354,18 @@ func (s) TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) {
// 2 balancers, change balancer weight.
func (s) TestBalancerGroup_TwoRR_ChangeWeight_MoreBackends(t *testing.T) {
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
bg.Start()
cc, gator, bg := newTestBalancerGroup(t, nil)
// Add two balancers to group and send two resolved addresses to both
// balancers.
bg.Add(testBalancerIDs[0], 2, rrBuilder)
gator.Add(testBalancerIDs[0], 2)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
sc1 := <-cc.NewSubConnCh
sc2 := <-cc.NewSubConnCh
bg.Add(testBalancerIDs[1], 1, rrBuilder)
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
sc3 := <-cc.NewSubConnCh
sc4 := <-cc.NewSubConnCh
@ -363,7 +387,8 @@ func (s) TestBalancerGroup_TwoRR_ChangeWeight_MoreBackends(t *testing.T) {
t.Fatalf("want %v, got %v", want, err)
}
bg.ChangeWeight(testBalancerIDs[0], 3)
gator.UpdateWeight(testBalancerIDs[0], 3)
gator.BuildAndUpdate()
// Test roundrobin with new weight.
p2 := <-cc.NewPickerCh
@ -375,23 +400,22 @@ func (s) TestBalancerGroup_TwoRR_ChangeWeight_MoreBackends(t *testing.T) {
func (s) TestBalancerGroup_LoadReport(t *testing.T) {
testLoadStore := testutils.NewTestLoadStore()
cc := testutils.NewTestClientConn(t)
bg := New(cc, testLoadStore, nil)
bg.Start()
cc, gator, bg := newTestBalancerGroup(t, testLoadStore)
backendToBalancerID := make(map[balancer.SubConn]internal.LocalityID)
// Add two balancers to group and send two resolved addresses to both
// balancers.
bg.Add(testBalancerIDs[0], 2, rrBuilder)
gator.Add(testBalancerIDs[0], 2)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
sc1 := <-cc.NewSubConnCh
sc2 := <-cc.NewSubConnCh
backendToBalancerID[sc1] = testBalancerIDs[0]
backendToBalancerID[sc2] = testBalancerIDs[0]
bg.Add(testBalancerIDs[1], 1, rrBuilder)
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
sc3 := <-cc.NewSubConnCh
sc4 := <-cc.NewSubConnCh
@ -459,13 +483,17 @@ func (s) TestBalancerGroup_LoadReport(t *testing.T) {
// Start the balancer group again and check for behavior.
func (s) TestBalancerGroup_start_close(t *testing.T) {
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, gator, nil, nil)
// Add two balancers to group and send two resolved addresses to both
// balancers.
bg.Add(testBalancerIDs[0], 2, rrBuilder)
gator.Add(testBalancerIDs[0], 2)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
bg.Add(testBalancerIDs[1], 1, rrBuilder)
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
bg.Start()
@ -490,22 +518,26 @@ func (s) TestBalancerGroup_start_close(t *testing.T) {
t.Fatalf("want %v, got %v", want, err)
}
gator.Stop()
bg.Close()
for i := 0; i < 4; i++ {
bg.UpdateSubConnState(<-cc.RemoveSubConnCh, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
}
// Add b3, weight 1, backends [1,2].
bg.Add(testBalancerIDs[2], 1, rrBuilder)
gator.Add(testBalancerIDs[2], 1)
bg.Add(testBalancerIDs[2], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[2], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[1:3]}})
// Remove b1.
gator.Remove(testBalancerIDs[0])
bg.Remove(testBalancerIDs[0])
// Update b2 to weight 3, backends [0,3].
bg.ChangeWeight(testBalancerIDs[1], 3)
gator.UpdateWeight(testBalancerIDs[1], 3)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: append([]resolver.Address(nil), testBackendAddrs[0], testBackendAddrs[3])}})
gator.Start()
bg.Start()
m2 := make(map[resolver.Address]balancer.SubConn)
@ -543,11 +575,15 @@ func (s) TestBalancerGroup_start_close(t *testing.T) {
// because of deadlock.
func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, gator, nil, nil)
bg.Add(testBalancerIDs[0], 2, &testutils.TestConstBalancerBuilder{})
gator.Add(testBalancerIDs[0], 2)
bg.Add(testBalancerIDs[0], &testutils.TestConstBalancerBuilder{})
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
bg.Add(testBalancerIDs[1], 1, &testutils.TestConstBalancerBuilder{})
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], &testutils.TestConstBalancerBuilder{})
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
bg.Start()
@ -557,17 +593,17 @@ func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
// transient_failure, the picks won't fail with transient_failure, and should
// instead wait for the other sub-balancer.
func (s) TestBalancerGroup_InitOneSubBalancerTransientFailure(t *testing.T) {
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
bg.Start()
cc, gator, bg := newTestBalancerGroup(t, nil)
// Add two balancers to group and send one resolved address to both
// balancers.
bg.Add(testBalancerIDs[0], 1, rrBuilder)
gator.Add(testBalancerIDs[0], 1)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
sc1 := <-cc.NewSubConnCh
bg.Add(testBalancerIDs[1], 1, rrBuilder)
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
<-cc.NewSubConnCh
@ -588,17 +624,17 @@ func (s) TestBalancerGroup_InitOneSubBalancerTransientFailure(t *testing.T) {
// connecting, the overall state stays in transient_failure, and all picks
// return transient failure error.
func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *testing.T) {
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
bg.Start()
cc, gator, bg := newTestBalancerGroup(t, nil)
// Add two balancers to group and send one resolved address to both
// balancers.
bg.Add(testBalancerIDs[0], 1, pfBuilder)
gator.Add(testBalancerIDs[0], 1)
bg.Add(testBalancerIDs[0], pfBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
sc1 := <-cc.NewSubConnCh
bg.Add(testBalancerIDs[1], 1, pfBuilder)
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], pfBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
sc2 := <-cc.NewSubConnCh
@ -639,15 +675,19 @@ func replaceDefaultSubBalancerCloseTimeout(n time.Duration) func() {
// Two rr balancers are added to bg, each with 2 ready subConns. A sub-balancer
// is removed later, so the balancer group returned has one sub-balancer in its
// own map, and one sub-balancer in cache.
func initBalancerGroupForCachingTest(t *testing.T) (*BalancerGroup, *testutils.TestClientConn, map[resolver.Address]balancer.SubConn) {
func initBalancerGroupForCachingTest(t *testing.T) (*weightedaggregator.Aggregator, *BalancerGroup, *testutils.TestClientConn, map[resolver.Address]balancer.SubConn) {
cc := testutils.NewTestClientConn(t)
bg := New(cc, nil, nil)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, gator, nil, nil)
// Add two balancers to group and send two resolved addresses to both
// balancers.
bg.Add(testBalancerIDs[0], 2, rrBuilder)
gator.Add(testBalancerIDs[0], 2)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
bg.Add(testBalancerIDs[1], 1, rrBuilder)
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
bg.Start()
@ -672,7 +712,9 @@ func initBalancerGroupForCachingTest(t *testing.T) (*BalancerGroup, *testutils.T
t.Fatalf("want %v, got %v", want, err)
}
gator.Remove(testBalancerIDs[1])
bg.Remove(testBalancerIDs[1])
gator.BuildAndUpdate()
// Don't wait for SubConns to be removed after close, because they are only
// removed after close timeout.
for i := 0; i < 10; i++ {
@ -692,14 +734,14 @@ func initBalancerGroupForCachingTest(t *testing.T) (*BalancerGroup, *testutils.T
t.Fatalf("want %v, got %v", want, err)
}
return bg, cc, m1
return gator, bg, cc, m1
}
// Test that if a sub-balancer is removed, and re-added within close timeout,
// the subConns won't be re-created.
func (s) TestBalancerGroup_locality_caching(t *testing.T) {
defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)()
bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
gator, bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
// Turn down subconn for addr2, shouldn't get picker update because
// sub-balancer1 was removed.
@ -719,7 +761,8 @@ func (s) TestBalancerGroup_locality_caching(t *testing.T) {
// Re-add sub-balancer-1, because subconns were in cache, no new subconns
// should be created. But a new picker will still be generated, with subconn
// states update to date.
bg.Add(testBalancerIDs[1], 1, rrBuilder)
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
p3 := <-cc.NewPickerCh
want := []balancer.SubConn{
@ -747,7 +790,7 @@ func (s) TestBalancerGroup_locality_caching(t *testing.T) {
// immediately.
func (s) TestBalancerGroup_locality_caching_close_group(t *testing.T) {
defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)()
bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
_, bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
bg.Close()
// The balancer group is closed. The subconns should be removed immediately.
@ -776,7 +819,7 @@ func (s) TestBalancerGroup_locality_caching_close_group(t *testing.T) {
// subConns will be removed.
func (s) TestBalancerGroup_locality_caching_not_readd_within_timeout(t *testing.T) {
defer replaceDefaultSubBalancerCloseTimeout(time.Second)()
_, cc, addrToSC := initBalancerGroupForCachingTest(t)
_, _, cc, addrToSC := initBalancerGroupForCachingTest(t)
// The sub-balancer is not re-added withtin timeout. The subconns should be
// removed.
@ -808,13 +851,14 @@ type noopBalancerBuilderWrapper struct {
// builder. Old subconns should be removed, and new subconns should be created.
func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *testing.T) {
defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)()
bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
gator, bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
// Re-add sub-balancer-1, but with a different balancer builder. The
// sub-balancer was still in cache, but cann't be reused. This should cause
// old sub-balancer's subconns to be removed immediately, and new subconns
// to be created.
bg.Add(testBalancerIDs[1], 1, &noopBalancerBuilderWrapper{rrBuilder})
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], &noopBalancerBuilderWrapper{rrBuilder})
// The cached sub-balancer should be closed, and the subconns should be
// removed immediately.

View File

@ -0,0 +1,38 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package balancergroup
import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/xds/internal"
)
// BalancerStateAggregator aggregates sub-picker and connectivity states into a
// state.
//
// It takes care of merging sub-picker into one picker. The picking config is
// passed directly from the the parent to the aggregator implementation (instead
// via balancer group).
type BalancerStateAggregator interface {
// UpdateState updates the state of the id.
//
// It's up to the implementation whether this will trigger an update to the
// parent ClientConn.
UpdateState(id internal.LocalityID, state balancer.State)
}

View File

@ -22,7 +22,6 @@ import (
"testing"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/xds/internal/testutils"
)
type s struct {
@ -32,7 +31,3 @@ type s struct {
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
func init() {
NewRandomWRR = testutils.NewTestWRR
}

View File

@ -35,6 +35,7 @@ import (
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/balancer/lrs"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
xdsclient "google.golang.org/grpc/xds/internal/client"
)
@ -50,6 +51,7 @@ type localityConfig struct {
// manages all localities using a balancerGroup.
type balancerGroupWithConfig struct {
bg *balancergroup.BalancerGroup
stateAggregator *weightedaggregator.Aggregator
configs map[internal.LocalityID]*localityConfig
}
@ -141,10 +143,12 @@ func (edsImpl *edsBalancerImpl) handleChildPolicy(name string, config json.RawMe
// switching sub-balancers (keep old balancer around until new
// balancer becomes ready).
bgwc.bg.Remove(id)
bgwc.bg.Add(id, config.weight, edsImpl.subBalancerBuilder)
bgwc.bg.Add(id, edsImpl.subBalancerBuilder)
bgwc.bg.UpdateClientConnState(id, balancer.ClientConnState{
ResolverState: resolver.State{Addresses: config.addrs},
})
// This doesn't need to manually update picker, because the new
// sub-balancer will send it's picker later.
}
}
}
@ -233,8 +237,11 @@ func (edsImpl *edsBalancerImpl) handleEDSResponse(edsResp xdsclient.EndpointsUpd
// time this priority is received). We don't start it here. It may
// be started when necessary (e.g. when higher is down, or if it's a
// new lowest priority).
ccPriorityWrapper := edsImpl.ccWrapperWithPriority(priority)
stateAggregator := weightedaggregator.New(ccPriorityWrapper, edsImpl.logger, newRandomWRR)
bgwc = &balancerGroupWithConfig{
bg: balancergroup.New(edsImpl.ccWrapperWithPriority(priority), edsImpl.loadStore, edsImpl.logger),
bg: balancergroup.New(ccPriorityWrapper, stateAggregator, edsImpl.loadStore, edsImpl.logger),
stateAggregator: stateAggregator,
configs: make(map[internal.LocalityID]*localityConfig),
}
edsImpl.priorityToLocalities[priority] = bgwc
@ -270,6 +277,7 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup
// for the same priority. It's used to delete localities that are removed in
// the new EDS response.
newLocalitiesSet := make(map[internal.LocalityID]struct{})
var rebuildStateAndPicker bool
for _, locality := range newLocalities {
// One balancer for each locality.
@ -308,7 +316,8 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup
config, ok := bgwc.configs[lid]
if !ok {
// A new balancer, add it to balancer group and balancer map.
bgwc.bg.Add(lid, newWeight, edsImpl.subBalancerBuilder)
bgwc.stateAggregator.Add(lid, newWeight)
bgwc.bg.Add(lid, edsImpl.subBalancerBuilder)
config = &localityConfig{
weight: newWeight,
}
@ -331,7 +340,8 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup
if weightChanged {
config.weight = newWeight
bgwc.bg.ChangeWeight(lid, newWeight)
bgwc.stateAggregator.UpdateWeight(lid, newWeight)
rebuildStateAndPicker = true
}
if addrsChanged {
@ -345,11 +355,17 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup
// Delete localities that are removed in the latest response.
for lid := range bgwc.configs {
if _, ok := newLocalitiesSet[lid]; !ok {
bgwc.stateAggregator.Remove(lid)
bgwc.bg.Remove(lid)
delete(bgwc.configs, lid)
edsImpl.logger.Infof("Locality %v deleted", lid)
rebuildStateAndPicker = true
}
}
if rebuildStateAndPicker {
bgwc.stateAggregator.BuildAndUpdate()
}
}
// handleSubConnStateChange handles the state change and update pickers accordingly.
@ -429,6 +445,7 @@ func (edsImpl *edsBalancerImpl) newSubConn(priority priorityType, addrs []resolv
func (edsImpl *edsBalancerImpl) close() {
for _, bgwc := range edsImpl.priorityToLocalities {
if bg := bgwc.bg; bg != nil {
bgwc.stateAggregator.Stop()
bg.Close()
}
}

View File

@ -104,6 +104,7 @@ func (edsImpl *edsBalancerImpl) startPriority(priority priorityType) {
// currently avoided by handling balancer update in a goroutine (the run
// goroutine in the parent eds balancer). When priority balancer is split
// into its own, this asynchronous state handling needs to be copied.
p.stateAggregator.Start()
p.bg.Start()
// startPriority can be called when
// 1. first EDS resp, start p0
@ -191,7 +192,9 @@ func (edsImpl *edsBalancerImpl) handlePriorityWithNewStateReady(priority priorit
edsImpl.logger.Infof("Switching priority from %v to %v, because latter became Ready", edsImpl.priorityInUse, priority)
edsImpl.priorityInUse = priority
for i := priority.nextLower(); !i.lowerThan(edsImpl.priorityLowest); i = i.nextLower() {
edsImpl.priorityToLocalities[i].bg.Close()
bgwc := edsImpl.priorityToLocalities[i]
bgwc.stateAggregator.Stop()
bgwc.bg.Close()
}
return true
}

View File

@ -46,7 +46,6 @@ func init() {
for i := 0; i < testBackendAddrsCount; i++ {
testEndpointAddrs = append(testEndpointAddrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
}
balancergroup.NewRandomWRR = testutils.NewTestWRR
balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
}

View File

@ -0,0 +1,264 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package weightedaggregator implements state aggregator for weighted_target
// balancer.
//
// This is a separate package so it can be shared by weighted_target and eds.
// The eds balancer will be refactored to use weighted_target directly. After
// that, all functions and structs in this package can be moved to package
// weightedtarget and unexported.
package weightedaggregator
import (
"fmt"
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/xds/internal"
)
type weightedPickerState struct {
weight uint32
state balancer.State
// stateToAggregate is the connectivity state used only for state
// aggregation. It could be different from state.ConnectivityState. For
// example when a sub-balancer transitions from TransientFailure to
// connecting, state.ConnectivityState is Connecting, but stateToAggregate
// is still TransientFailure.
stateToAggregate connectivity.State
}
func (s *weightedPickerState) String() string {
return fmt.Sprintf("weight:%v,picker:%p,state:%v,stateToAggregate:%v", s.weight, s.state.Picker, s.state.ConnectivityState, s.stateToAggregate)
}
// Aggregator is the weighted balancer state aggregator.
type Aggregator struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger
newWRR func() wrr.WRR
mu sync.Mutex
// If started is false, no updates should be sent to the parent cc. A closed
// sub-balancer could still send pickers to this aggregator. This makes sure
// that no updates will be forwarded to parent when the whole balancer group
// and states aggregator is closed.
started bool
// All balancer IDs exist as keys in this map, even if balancer group is not
// started.
//
// If an ID is not in map, it's either removed or never added.
idToPickerState map[internal.LocalityID]*weightedPickerState
}
// New creates a new weighted balancer state aggregator.
func New(cc balancer.ClientConn, logger *grpclog.PrefixLogger, newWRR func() wrr.WRR) *Aggregator {
return &Aggregator{
cc: cc,
logger: logger,
newWRR: newWRR,
idToPickerState: make(map[internal.LocalityID]*weightedPickerState),
}
}
// Start starts the aggregator. It can be called after Close to restart the
// aggretator.
func (wbsa *Aggregator) Start() {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.started = true
}
// Stop stops the aggregator. When the aggregator is closed, it won't call
// parent ClientConn to upate balancer state.
func (wbsa *Aggregator) Stop() {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.started = false
wbsa.clearStates()
}
// Add adds a sub-balancer state with weight. It adds a place holder, and waits for
// the real sub-balancer to update state.
func (wbsa *Aggregator) Add(id internal.LocalityID, weight uint32) {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.idToPickerState[id] = &weightedPickerState{
weight: weight,
// Start everything in CONNECTING, so if one of the sub-balancers
// reports TransientFailure, the RPCs will still wait for the other
// sub-balancers.
state: balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
},
stateToAggregate: connectivity.Connecting,
}
}
// Remove removes the sub-balancer state. Future updates from this sub-balancer,
// if any, will be ignored.
func (wbsa *Aggregator) Remove(id internal.LocalityID) {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
if _, ok := wbsa.idToPickerState[id]; !ok {
return
}
// Remove id and picker from picker map. This also results in future updates
// for this ID to be ignored.
delete(wbsa.idToPickerState, id)
}
// UpdateWeight updates the weight for the given id. Note that this doesn't
// trigger an update to the parent ClientConn. The caller should decide when
// it's necessary, and call BuildAndUpdate.
func (wbsa *Aggregator) UpdateWeight(id internal.LocalityID, newWeight uint32) {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
pState, ok := wbsa.idToPickerState[id]
if !ok {
return
}
pState.weight = newWeight
}
// UpdateState is called to report a balancer state change from sub-balancer.
// It's usually called by the balancer group.
//
// It calls parent ClientConn's UpdateState with the new aggregated state.
func (wbsa *Aggregator) UpdateState(id internal.LocalityID, newState balancer.State) {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
oldState, ok := wbsa.idToPickerState[id]
if !ok {
// All state starts with an entry in pickStateMap. If ID is not in map,
// it's either removed, or never existed.
return
}
if !(oldState.state.ConnectivityState == connectivity.TransientFailure && newState.ConnectivityState == connectivity.Connecting) {
// If old state is TransientFailure, and new state is Connecting, don't
// update the state, to prevent the aggregated state from being always
// CONNECTING. Otherwise, stateToAggregate is the same as
// state.ConnectivityState.
oldState.stateToAggregate = newState.ConnectivityState
}
oldState.state = newState
if !wbsa.started {
return
}
wbsa.cc.UpdateState(wbsa.build())
}
// clearState Reset everything to init state (Connecting) but keep the entry in
// map (to keep the weight).
//
// Caller must hold wbsa.mu.
func (wbsa *Aggregator) clearStates() {
for _, pState := range wbsa.idToPickerState {
pState.state = balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
}
pState.stateToAggregate = connectivity.Connecting
}
}
// BuildAndUpdate combines the sub-state from each sub-balancer into one state,
// and update it to parent ClientConn.
func (wbsa *Aggregator) BuildAndUpdate() {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
if !wbsa.started {
return
}
wbsa.cc.UpdateState(wbsa.build())
}
// build combines sub-states into one.
//
// Caller must hold wbsa.mu.
func (wbsa *Aggregator) build() balancer.State {
wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState)
m := wbsa.idToPickerState
var readyN, connectingN int
readyPickerWithWeights := make([]weightedPickerState, 0, len(m))
for _, ps := range m {
switch ps.stateToAggregate {
case connectivity.Ready:
readyN++
readyPickerWithWeights = append(readyPickerWithWeights, *ps)
case connectivity.Connecting:
connectingN++
}
}
var aggregatedState connectivity.State
switch {
case readyN > 0:
aggregatedState = connectivity.Ready
case connectingN > 0:
aggregatedState = connectivity.Connecting
default:
aggregatedState = connectivity.TransientFailure
}
// Make sure picker's return error is consistent with the aggregatedState.
var picker balancer.Picker
switch aggregatedState {
case connectivity.TransientFailure:
picker = base.NewErrPicker(balancer.ErrTransientFailure)
case connectivity.Connecting:
picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable)
default:
picker = newWeightedPickerGroup(readyPickerWithWeights, wbsa.newWRR)
}
return balancer.State{ConnectivityState: aggregatedState, Picker: picker}
}
type weightedPickerGroup struct {
w wrr.WRR
}
// newWeightedPickerGroup takes pickers with weights, and groups them into one
// picker.
//
// Note it only takes ready pickers. The map shouldn't contain non-ready
// pickers.
func newWeightedPickerGroup(readyWeightedPickers []weightedPickerState, newWRR func() wrr.WRR) *weightedPickerGroup {
w := newWRR()
for _, ps := range readyWeightedPickers {
w.Add(ps.state.Picker, int64(ps.weight))
}
return &weightedPickerGroup{
w: w,
}
}
func (pg *weightedPickerGroup) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
p, ok := pg.w.Next().(balancer.Picker)
if !ok {
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
return p.Pick(info)
}

View File

@ -24,17 +24,22 @@ import (
"fmt"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/hierarchy"
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
)
const weightedTargetName = "weighted_target_experimental"
// newRandomWRR is the WRR constructor used to pick sub-pickers from
// sub-balancers. It's to be modified in tests.
var newRandomWRR = wrr.NewRandom
func init() {
balancer.Register(&weightedTargetBB{})
}
@ -43,8 +48,10 @@ type weightedTargetBB struct{}
func (wt *weightedTargetBB) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
b := &weightedTargetBalancer{}
b.logger = prefixLogger((b))
b.bg = balancergroup.New(cc, nil, b.logger)
b.logger = prefixLogger(b)
b.stateAggregator = weightedaggregator.New(cc, b.logger, newRandomWRR)
b.stateAggregator.Start()
b.bg = balancergroup.New(cc, b.stateAggregator, nil, b.logger)
b.bg.Start()
b.logger.Infof("Created")
return b
@ -67,6 +74,7 @@ type weightedTargetBalancer struct {
// dependencies are removed from the balancerGroup, this package will not
// have any dependencies on xds code.
bg *balancergroup.BalancerGroup
stateAggregator *weightedaggregator.Aggregator
targets map[string]target
}
@ -86,10 +94,17 @@ func (w *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
}
addressesSplit := hierarchy.Group(s.ResolverState.Addresses)
// Remove sub-balancers that are not in the new config.
var rebuildStateAndPicker bool
// Remove sub-pickers and sub-balancers that are not in the new config.
for name := range w.targets {
if _, ok := newConfig.Targets[name]; !ok {
w.bg.Remove(makeLocalityFromName(name))
l := makeLocalityFromName(name)
w.stateAggregator.Remove(l)
w.bg.Remove(l)
// Trigger a state/picker update, because we don't want `ClientConn`
// to pick this sub-balancer anymore.
rebuildStateAndPicker = true
}
}
@ -103,11 +118,18 @@ func (w *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
oldT, ok := w.targets[name]
if !ok {
// If this is a new sub-balancer, add it.
w.bg.Add(l, newT.Weight, balancer.Get(newT.ChildPolicy.Name))
// If this is a new sub-balancer, add weights to the picker map.
w.stateAggregator.Add(l, newT.Weight)
// Then add to the balancer group.
w.bg.Add(l, balancer.Get(newT.ChildPolicy.Name))
// Not trigger a state/picker update. Wait for the new sub-balancer
// to send its updates.
} else if newT.Weight != oldT.Weight {
// If this is an existing sub-balancer, update weight if necessary.
w.bg.ChangeWeight(l, newT.Weight)
w.stateAggregator.UpdateWeight(l, newT.Weight)
// Trigger a state/picker update, because we don't want `ClientConn`
// should do picks with the new weights now.
rebuildStateAndPicker = true
}
// Forwards all the update:
@ -127,6 +149,10 @@ func (w *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
}
w.targets = newConfig.Targets
if rebuildStateAndPicker {
w.stateAggregator.BuildAndUpdate()
}
return nil
}
@ -139,13 +165,6 @@ func (w *weightedTargetBalancer) UpdateSubConnState(sc balancer.SubConn, state b
}
func (w *weightedTargetBalancer) Close() {
w.stateAggregator.Stop()
w.bg.Close()
}
func (w *weightedTargetBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
w.logger.Errorf("UpdateSubConnState should be called instead of HandleSubConnStateChange")
}
func (w *weightedTargetBalancer) HandleResolvedAddrs([]resolver.Address, error) {
w.logger.Errorf("UpdateClientConnState should be called instead of HandleResolvedAddrs")
}