xds: rename subBalancerWithConfig to subBalancerWrapper (#3608)
This commit is contained in:
@ -35,16 +35,16 @@ import (
|
|||||||
"google.golang.org/grpc/xds/internal/balancer/lrs"
|
"google.golang.org/grpc/xds/internal/balancer/lrs"
|
||||||
)
|
)
|
||||||
|
|
||||||
// subBalancerWithConfig is used to keep the configurations that will be used to start
|
// subBalancerWrapper is used to keep the configurations that will be used to start
|
||||||
// the underlying balancer. It can be called to start/stop the underlying
|
// the underlying balancer. It can be called to start/stop the underlying
|
||||||
// balancer.
|
// balancer.
|
||||||
//
|
//
|
||||||
// When the config changes, it will pass the update to the underlying balancer
|
// When the config changes, it will pass the update to the underlying balancer
|
||||||
// if it exists.
|
// if it exists.
|
||||||
//
|
//
|
||||||
// TODO: rename to subBalanceWrapper (and move to a separate file?)
|
// TODO: move to a separate file?
|
||||||
type subBalancerWithConfig struct {
|
type subBalancerWrapper struct {
|
||||||
// subBalancerWithConfig is passed to the sub-balancer as a ClientConn
|
// subBalancerWrapper is passed to the sub-balancer as a ClientConn
|
||||||
// wrapper, only to keep the state and picker. When sub-balancer is
|
// wrapper, only to keep the state and picker. When sub-balancer is
|
||||||
// restarted while in cache, the picker needs to be resent.
|
// restarted while in cache, the picker needs to be resent.
|
||||||
//
|
//
|
||||||
@ -74,7 +74,7 @@ type subBalancerWithConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// UpdateState overrides balancer.ClientConn, to keep state and picker.
|
// UpdateState overrides balancer.ClientConn, to keep state and picker.
|
||||||
func (sbc *subBalancerWithConfig) UpdateState(state balancer.State) {
|
func (sbc *subBalancerWrapper) UpdateState(state balancer.State) {
|
||||||
sbc.mu.Lock()
|
sbc.mu.Lock()
|
||||||
sbc.state = state
|
sbc.state = state
|
||||||
sbc.group.updateBalancerState(sbc.id, state)
|
sbc.group.updateBalancerState(sbc.id, state)
|
||||||
@ -83,11 +83,11 @@ func (sbc *subBalancerWithConfig) UpdateState(state balancer.State) {
|
|||||||
|
|
||||||
// NewSubConn overrides balancer.ClientConn, so balancer group can keep track of
|
// NewSubConn overrides balancer.ClientConn, so balancer group can keep track of
|
||||||
// the relation between subconns and sub-balancers.
|
// the relation between subconns and sub-balancers.
|
||||||
func (sbc *subBalancerWithConfig) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
func (sbc *subBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
||||||
return sbc.group.newSubConn(sbc, addrs, opts)
|
return sbc.group.newSubConn(sbc, addrs, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sbc *subBalancerWithConfig) updateBalancerStateWithCachedPicker() {
|
func (sbc *subBalancerWrapper) updateBalancerStateWithCachedPicker() {
|
||||||
sbc.mu.Lock()
|
sbc.mu.Lock()
|
||||||
if sbc.state.Picker != nil {
|
if sbc.state.Picker != nil {
|
||||||
sbc.group.updateBalancerState(sbc.id, sbc.state)
|
sbc.group.updateBalancerState(sbc.id, sbc.state)
|
||||||
@ -95,7 +95,7 @@ func (sbc *subBalancerWithConfig) updateBalancerStateWithCachedPicker() {
|
|||||||
sbc.mu.Unlock()
|
sbc.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sbc *subBalancerWithConfig) startBalancer() {
|
func (sbc *subBalancerWrapper) startBalancer() {
|
||||||
b := sbc.builder.Build(sbc, balancer.BuildOptions{})
|
b := sbc.builder.Build(sbc, balancer.BuildOptions{})
|
||||||
sbc.group.logger.Infof("Created child policy %p of type %v", b, sbc.builder.Name())
|
sbc.group.logger.Infof("Created child policy %p of type %v", b, sbc.builder.Name())
|
||||||
sbc.balancer = b
|
sbc.balancer = b
|
||||||
@ -104,7 +104,7 @@ func (sbc *subBalancerWithConfig) startBalancer() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sbc *subBalancerWithConfig) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
|
func (sbc *subBalancerWrapper) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
|
||||||
b := sbc.balancer
|
b := sbc.balancer
|
||||||
if b == nil {
|
if b == nil {
|
||||||
// This sub-balancer was closed. This can happen when EDS removes a
|
// This sub-balancer was closed. This can happen when EDS removes a
|
||||||
@ -116,7 +116,7 @@ func (sbc *subBalancerWithConfig) updateSubConnState(sc balancer.SubConn, state
|
|||||||
b.UpdateSubConnState(sc, state)
|
b.UpdateSubConnState(sc, state)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sbc *subBalancerWithConfig) updateClientConnState(s balancer.ClientConnState) error {
|
func (sbc *subBalancerWrapper) updateClientConnState(s balancer.ClientConnState) error {
|
||||||
sbc.ccState = &s
|
sbc.ccState = &s
|
||||||
b := sbc.balancer
|
b := sbc.balancer
|
||||||
if b == nil {
|
if b == nil {
|
||||||
@ -133,7 +133,7 @@ func (sbc *subBalancerWithConfig) updateClientConnState(s balancer.ClientConnSta
|
|||||||
return b.UpdateClientConnState(s)
|
return b.UpdateClientConnState(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sbc *subBalancerWithConfig) resolverError(err error) {
|
func (sbc *subBalancerWrapper) resolverError(err error) {
|
||||||
b := sbc.balancer
|
b := sbc.balancer
|
||||||
if b == nil {
|
if b == nil {
|
||||||
// This sub-balancer was closed. This should never happen because
|
// This sub-balancer was closed. This should never happen because
|
||||||
@ -149,7 +149,7 @@ func (sbc *subBalancerWithConfig) resolverError(err error) {
|
|||||||
b.ResolverError(err)
|
b.ResolverError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sbc *subBalancerWithConfig) stopBalancer() {
|
func (sbc *subBalancerWrapper) stopBalancer() {
|
||||||
sbc.balancer.Close()
|
sbc.balancer.Close()
|
||||||
sbc.balancer = nil
|
sbc.balancer = nil
|
||||||
}
|
}
|
||||||
@ -206,7 +206,7 @@ type BalancerGroup struct {
|
|||||||
// to sub-balancers after they are closed.
|
// to sub-balancers after they are closed.
|
||||||
outgoingMu sync.Mutex
|
outgoingMu sync.Mutex
|
||||||
outgoingStarted bool
|
outgoingStarted bool
|
||||||
idToBalancerConfig map[internal.LocalityID]*subBalancerWithConfig
|
idToBalancerConfig map[internal.LocalityID]*subBalancerWrapper
|
||||||
// Cache for sub-balancers when they are removed.
|
// Cache for sub-balancers when they are removed.
|
||||||
balancerCache *cache.TimeoutCache
|
balancerCache *cache.TimeoutCache
|
||||||
|
|
||||||
@ -233,7 +233,7 @@ type BalancerGroup struct {
|
|||||||
// from sub-balancers after they are closed.
|
// from sub-balancers after they are closed.
|
||||||
incomingMu sync.Mutex
|
incomingMu sync.Mutex
|
||||||
incomingStarted bool // This boolean only guards calls back to ClientConn.
|
incomingStarted bool // This boolean only guards calls back to ClientConn.
|
||||||
scToSubBalancer map[balancer.SubConn]*subBalancerWithConfig
|
scToSubBalancer map[balancer.SubConn]*subBalancerWrapper
|
||||||
// All balancer IDs exist as keys in this map, even if balancer group is not
|
// All balancer IDs exist as keys in this map, even if balancer group is not
|
||||||
// started.
|
// started.
|
||||||
//
|
//
|
||||||
@ -255,9 +255,9 @@ func New(cc balancer.ClientConn, loadStore lrs.Store, logger *grpclog.PrefixLogg
|
|||||||
logger: logger,
|
logger: logger,
|
||||||
loadStore: loadStore,
|
loadStore: loadStore,
|
||||||
|
|
||||||
idToBalancerConfig: make(map[internal.LocalityID]*subBalancerWithConfig),
|
idToBalancerConfig: make(map[internal.LocalityID]*subBalancerWrapper),
|
||||||
balancerCache: cache.NewTimeoutCache(DefaultSubBalancerCloseTimeout),
|
balancerCache: cache.NewTimeoutCache(DefaultSubBalancerCloseTimeout),
|
||||||
scToSubBalancer: make(map[balancer.SubConn]*subBalancerWithConfig),
|
scToSubBalancer: make(map[balancer.SubConn]*subBalancerWrapper),
|
||||||
idToPickerState: make(map[internal.LocalityID]*pickerState),
|
idToPickerState: make(map[internal.LocalityID]*pickerState),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -313,12 +313,12 @@ func (bg *BalancerGroup) Add(id internal.LocalityID, weight uint32, builder bala
|
|||||||
|
|
||||||
// Store data in static map, and then check to see if bg is started.
|
// Store data in static map, and then check to see if bg is started.
|
||||||
bg.outgoingMu.Lock()
|
bg.outgoingMu.Lock()
|
||||||
var sbc *subBalancerWithConfig
|
var sbc *subBalancerWrapper
|
||||||
// If outgoingStarted is true, search in the cache. Otherwise, cache is
|
// If outgoingStarted is true, search in the cache. Otherwise, cache is
|
||||||
// guaranteed to be empty, searching is unnecessary.
|
// guaranteed to be empty, searching is unnecessary.
|
||||||
if bg.outgoingStarted {
|
if bg.outgoingStarted {
|
||||||
if old, ok := bg.balancerCache.Remove(id); ok {
|
if old, ok := bg.balancerCache.Remove(id); ok {
|
||||||
sbc, _ = old.(*subBalancerWithConfig)
|
sbc, _ = old.(*subBalancerWrapper)
|
||||||
if sbc != nil && sbc.builder != builder {
|
if sbc != nil && sbc.builder != builder {
|
||||||
// If the sub-balancer in cache was built with a different
|
// If the sub-balancer in cache was built with a different
|
||||||
// balancer builder, don't use it, cleanup this old-balancer,
|
// balancer builder, don't use it, cleanup this old-balancer,
|
||||||
@ -336,7 +336,7 @@ func (bg *BalancerGroup) Add(id internal.LocalityID, weight uint32, builder bala
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if sbc == nil {
|
if sbc == nil {
|
||||||
sbc = &subBalancerWithConfig{
|
sbc = &subBalancerWrapper{
|
||||||
ClientConn: bg.cc,
|
ClientConn: bg.cc,
|
||||||
id: id,
|
id: id,
|
||||||
group: bg,
|
group: bg,
|
||||||
@ -401,7 +401,7 @@ func (bg *BalancerGroup) Remove(id internal.LocalityID) {
|
|||||||
|
|
||||||
// bg.remove(id) doesn't do cleanup for the sub-balancer. This function does
|
// bg.remove(id) doesn't do cleanup for the sub-balancer. This function does
|
||||||
// cleanup after the timeout.
|
// cleanup after the timeout.
|
||||||
func (bg *BalancerGroup) cleanupSubConns(config *subBalancerWithConfig) {
|
func (bg *BalancerGroup) cleanupSubConns(config *subBalancerWrapper) {
|
||||||
bg.incomingMu.Lock()
|
bg.incomingMu.Lock()
|
||||||
// Remove SubConns. This is only done after the balancer is
|
// Remove SubConns. This is only done after the balancer is
|
||||||
// actually closed.
|
// actually closed.
|
||||||
@ -501,7 +501,7 @@ func (bg *BalancerGroup) ResolverError(err error) {
|
|||||||
// from map. Delete sc from the map only when state changes to Shutdown. Since
|
// from map. Delete sc from the map only when state changes to Shutdown. Since
|
||||||
// it's just forwarding the action, there's no need for a removeSubConn()
|
// it's just forwarding the action, there's no need for a removeSubConn()
|
||||||
// wrapper function.
|
// wrapper function.
|
||||||
func (bg *BalancerGroup) newSubConn(config *subBalancerWithConfig, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
func (bg *BalancerGroup) newSubConn(config *subBalancerWrapper, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
||||||
// NOTE: if balancer with id was already removed, this should also return
|
// NOTE: if balancer with id was already removed, this should also return
|
||||||
// error. But since we call balancer.stopBalancer when removing the balancer, this
|
// error. But since we call balancer.stopBalancer when removing the balancer, this
|
||||||
// shouldn't happen.
|
// shouldn't happen.
|
||||||
|
Reference in New Issue
Block a user