eds: move edsBalancer to the package edsbalancer
(#3294)
edsBalancer (the old xds balancer) was in `package balancer`, one level above the eds implementation. It's a thin wrapper of the eds impl (and fallback in the future). This change moves the thin wrapper to `package edsbalancer`, and also renames some structs.
This commit is contained in:
@ -23,8 +23,7 @@
|
||||
package experimental
|
||||
|
||||
import (
|
||||
_ "google.golang.org/grpc/xds/internal/balancer" // Register the xds_balancer
|
||||
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the cds balancer
|
||||
_ "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver
|
||||
_ "google.golang.org/grpc/xds/internal/resolver/old" // Register the old xds_resolver
|
||||
_ "google.golang.org/grpc/xds/internal/balancer" // Register the balancers.
|
||||
_ "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver
|
||||
_ "google.golang.org/grpc/xds/internal/resolver/old" // Register the old xds_resolver
|
||||
)
|
||||
|
24
xds/internal/balancer/balancer.go
Normal file
24
xds/internal/balancer/balancer.go
Normal file
@ -0,0 +1,24 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2020 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package balancer
|
||||
|
||||
import (
|
||||
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the CDS balancer
|
||||
_ "google.golang.org/grpc/xds/internal/balancer/edsbalancer" // Register the EDS balancer
|
||||
)
|
@ -30,9 +30,9 @@ import (
|
||||
"google.golang.org/grpc/internal/buffer"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
|
||||
|
||||
xdsinternal "google.golang.org/grpc/xds/internal"
|
||||
xdsbalancer "google.golang.org/grpc/xds/internal/balancer"
|
||||
xdsclient "google.golang.org/grpc/xds/internal/client"
|
||||
)
|
||||
|
||||
@ -229,7 +229,7 @@ func (b *cdsBalancer) run() {
|
||||
break
|
||||
}
|
||||
}
|
||||
lbCfg := &xdsbalancer.XDSConfig{EDSServiceName: update.cds.ServiceName}
|
||||
lbCfg := &edsbalancer.EDSConfig{EDSServiceName: update.cds.ServiceName}
|
||||
if update.cds.EnableLRS {
|
||||
// An empty string here indicates that the edsBalancer
|
||||
// should use the same xDS server for load reporting as
|
||||
|
@ -31,7 +31,7 @@ import (
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
xdsinternal "google.golang.org/grpc/xds/internal"
|
||||
xdsbalancer "google.golang.org/grpc/xds/internal/balancer"
|
||||
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
|
||||
xdsclient "google.golang.org/grpc/xds/internal/client"
|
||||
"google.golang.org/grpc/xds/internal/testutils"
|
||||
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
|
||||
@ -194,7 +194,7 @@ func cdsCCS(cluster string, xdsClient interface{}) balancer.ClientConnState {
|
||||
// edsCCS is a helper function to construct a good update passed from the
|
||||
// cdsBalancer to the edsBalancer.
|
||||
func edsCCS(service string, enableLRS bool, xdsClient interface{}) balancer.ClientConnState {
|
||||
lbCfg := &xdsbalancer.XDSConfig{EDSServiceName: service}
|
||||
lbCfg := &edsbalancer.EDSConfig{EDSServiceName: service}
|
||||
if enableLRS {
|
||||
lbCfg.LrsLoadReportingServerName = new(string)
|
||||
}
|
||||
|
@ -15,7 +15,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package balancer
|
||||
package edsbalancer
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
@ -25,9 +25,9 @@ import (
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
)
|
||||
|
||||
// XDSConfig represents the loadBalancingConfig section of the service config
|
||||
// for xDS balancers.
|
||||
type XDSConfig struct {
|
||||
// EDSConfig represents the loadBalancingConfig section of the service config
|
||||
// for EDS balancers.
|
||||
type EDSConfig struct {
|
||||
serviceconfig.LoadBalancingConfig
|
||||
// BalancerName represents the load balancer to use.
|
||||
BalancerName string
|
||||
@ -46,10 +46,10 @@ type XDSConfig struct {
|
||||
LrsLoadReportingServerName *string
|
||||
}
|
||||
|
||||
// xdsConfigJSON is the intermediate unmarshal result of XDSConfig. ChildPolicy
|
||||
// edsConfigJSON is the intermediate unmarshal result of EDSConfig. ChildPolicy
|
||||
// and Fallbackspolicy are post-processed, and for each, the first installed
|
||||
// policy is kept.
|
||||
type xdsConfigJSON struct {
|
||||
type edsConfigJSON struct {
|
||||
BalancerName string
|
||||
ChildPolicy []*loadBalancingConfig
|
||||
FallbackPolicy []*loadBalancingConfig
|
||||
@ -60,8 +60,8 @@ type xdsConfigJSON struct {
|
||||
// UnmarshalJSON parses the JSON-encoded byte slice in data and stores it in l.
|
||||
// When unmarshalling, we iterate through the childPolicy/fallbackPolicy lists
|
||||
// and select the first LB policy which has been registered.
|
||||
func (l *XDSConfig) UnmarshalJSON(data []byte) error {
|
||||
var configJSON xdsConfigJSON
|
||||
func (l *EDSConfig) UnmarshalJSON(data []byte) error {
|
||||
var configJSON edsConfigJSON
|
||||
if err := json.Unmarshal(data, &configJSON); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -87,8 +87,8 @@ func (l *XDSConfig) UnmarshalJSON(data []byte) error {
|
||||
}
|
||||
|
||||
// MarshalJSON returns a JSON encoding of l.
|
||||
func (l *XDSConfig) MarshalJSON() ([]byte, error) {
|
||||
return nil, fmt.Errorf("XDSConfig.MarshalJSON() is unimplemented")
|
||||
func (l *EDSConfig) MarshalJSON() ([]byte, error) {
|
||||
return nil, fmt.Errorf("EDSConfig.MarshalJSON() is unimplemented")
|
||||
}
|
||||
|
||||
// loadBalancingConfig represents a single load balancing config,
|
@ -16,8 +16,8 @@
|
||||
*
|
||||
*/
|
||||
|
||||
// Package balancer contains xds balancer implementation.
|
||||
package balancer
|
||||
// Package edsbalancer contains EDS balancer implementation.
|
||||
package edsbalancer
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -32,7 +32,6 @@ import (
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
|
||||
"google.golang.org/grpc/xds/internal/balancer/lrs"
|
||||
xdsclient "google.golang.org/grpc/xds/internal/client"
|
||||
)
|
||||
@ -43,8 +42,8 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
newEDSBalancer = func(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface {
|
||||
return edsbalancer.NewXDSBalancer(cc, loadStore)
|
||||
newEDSBalancer = func(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerImplInterface {
|
||||
return newEDSBalancerImpl(cc, loadStore)
|
||||
}
|
||||
)
|
||||
|
||||
@ -66,7 +65,7 @@ func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp
|
||||
xdsClientUpdate: make(chan interface{}),
|
||||
}
|
||||
loadStore := lrs.NewStore()
|
||||
x.xdsLB = newEDSBalancer(x.cc, loadStore)
|
||||
x.edsImpl = newEDSBalancer(x.cc, loadStore)
|
||||
x.client = newXDSClientWrapper(x.handleEDSUpdate, x.loseContact, x.buildOpts, loadStore)
|
||||
go x.run()
|
||||
return x
|
||||
@ -77,18 +76,18 @@ func (b *edsBalancerBuilder) Name() string {
|
||||
}
|
||||
|
||||
func (b *edsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
|
||||
var cfg XDSConfig
|
||||
var cfg EDSConfig
|
||||
if err := json.Unmarshal(c, &cfg); err != nil {
|
||||
return nil, fmt.Errorf("unable to unmarshal balancer config %s into xds config, error: %v", string(c), err)
|
||||
return nil, fmt.Errorf("unable to unmarshal balancer config %s into EDSConfig, error: %v", string(c), err)
|
||||
}
|
||||
return &cfg, nil
|
||||
}
|
||||
|
||||
// edsBalancerInterface defines the interface that edsBalancer must implement to
|
||||
// communicate with edsBalancer.
|
||||
// edsBalancerImplInterface defines the interface that edsBalancerImpl must
|
||||
// implement to communicate with edsBalancer.
|
||||
//
|
||||
// It's implemented by the real eds balancer and a fake testing eds balancer.
|
||||
type edsBalancerInterface interface {
|
||||
type edsBalancerImplInterface interface {
|
||||
// HandleEDSResponse passes the received EDS message from traffic director to eds balancer.
|
||||
HandleEDSResponse(edsResp *xdsclient.EDSUpdate)
|
||||
// HandleChildPolicy updates the eds balancer the intra-cluster load balancing policy to use.
|
||||
@ -101,8 +100,8 @@ type edsBalancerInterface interface {
|
||||
|
||||
var _ balancer.V2Balancer = (*edsBalancer)(nil) // Assert that we implement V2Balancer
|
||||
|
||||
// edsBalancer manages xdsClient and the actual balancer that does load
|
||||
// balancing.
|
||||
// edsBalancer manages xdsClient and the actual EDS balancer implementation that
|
||||
// does load balancing.
|
||||
//
|
||||
// It currently has only an edsBalancer. Later, we may add fallback.
|
||||
type edsBalancer struct {
|
||||
@ -115,9 +114,9 @@ type edsBalancer struct {
|
||||
grpcUpdate chan interface{}
|
||||
xdsClientUpdate chan interface{}
|
||||
|
||||
client *xdsclientWrapper // may change when passed a different service config
|
||||
config *XDSConfig // may change when passed a different service config
|
||||
xdsLB edsBalancerInterface
|
||||
client *xdsclientWrapper // may change when passed a different service config
|
||||
config *EDSConfig // may change when passed a different service config
|
||||
edsImpl edsBalancerImplInterface
|
||||
}
|
||||
|
||||
// run gets executed in a goroutine once edsBalancer is created. It monitors updates from grpc,
|
||||
@ -134,8 +133,8 @@ func (x *edsBalancer) run() {
|
||||
if x.client != nil {
|
||||
x.client.close()
|
||||
}
|
||||
if x.xdsLB != nil {
|
||||
x.xdsLB.Close()
|
||||
if x.edsImpl != nil {
|
||||
x.edsImpl.Close()
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -145,11 +144,11 @@ func (x *edsBalancer) run() {
|
||||
func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
|
||||
switch u := update.(type) {
|
||||
case *subConnStateUpdate:
|
||||
if x.xdsLB != nil {
|
||||
x.xdsLB.HandleSubConnStateChange(u.sc, u.state.ConnectivityState)
|
||||
if x.edsImpl != nil {
|
||||
x.edsImpl.HandleSubConnStateChange(u.sc, u.state.ConnectivityState)
|
||||
}
|
||||
case *balancer.ClientConnState:
|
||||
cfg, _ := u.BalancerConfig.(*XDSConfig)
|
||||
cfg, _ := u.BalancerConfig.(*EDSConfig)
|
||||
if cfg == nil {
|
||||
// service config parsing failed. should never happen.
|
||||
return
|
||||
@ -162,13 +161,13 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
// We will update the xdsLB with the new child policy, if we got a
|
||||
// We will update the edsImpl with the new child policy, if we got a
|
||||
// different one.
|
||||
if x.xdsLB != nil && !reflect.DeepEqual(cfg.ChildPolicy, x.config.ChildPolicy) {
|
||||
if x.edsImpl != nil && !reflect.DeepEqual(cfg.ChildPolicy, x.config.ChildPolicy) {
|
||||
if cfg.ChildPolicy != nil {
|
||||
x.xdsLB.HandleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config)
|
||||
x.edsImpl.HandleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config)
|
||||
} else {
|
||||
x.xdsLB.HandleChildPolicy(roundrobin.Name, nil)
|
||||
x.edsImpl.HandleChildPolicy(roundrobin.Name, nil)
|
||||
}
|
||||
}
|
||||
|
||||
@ -184,7 +183,7 @@ func (x *edsBalancer) handleXDSClientUpdate(update interface{}) {
|
||||
// TODO: this func should accept (*xdsclient.EDSUpdate, error), and process
|
||||
// the error, instead of having a separate loseContact signal.
|
||||
case *xdsclient.EDSUpdate:
|
||||
x.xdsLB.HandleEDSResponse(u)
|
||||
x.edsImpl.HandleEDSResponse(u)
|
||||
case *loseContact:
|
||||
// loseContact can be useful for going into fallback.
|
||||
default:
|
@ -14,7 +14,6 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
// Package edsbalancer implements a balancer to handle EDS responses.
|
||||
package edsbalancer
|
||||
|
||||
import (
|
||||
@ -51,13 +50,13 @@ type balancerGroupWithConfig struct {
|
||||
configs map[internal.Locality]*localityConfig
|
||||
}
|
||||
|
||||
// EDSBalancer does load balancing based on the EDS responses. Note that it
|
||||
// edsBalancerImpl does load balancing based on the EDS responses. Note that it
|
||||
// doesn't implement the balancer interface. It's intended to be used by a high
|
||||
// level balancer implementation.
|
||||
//
|
||||
// The localities are picked as weighted round robin. A configurable child
|
||||
// policy is used to manage endpoints in each locality.
|
||||
type EDSBalancer struct {
|
||||
type edsBalancerImpl struct {
|
||||
cc balancer.ClientConn
|
||||
|
||||
subBalancerBuilder balancer.Builder
|
||||
@ -89,9 +88,9 @@ type EDSBalancer struct {
|
||||
innerState balancer.State // The state of the picker without drop support.
|
||||
}
|
||||
|
||||
// NewXDSBalancer create a new EDSBalancer.
|
||||
func NewXDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) *EDSBalancer {
|
||||
xdsB := &EDSBalancer{
|
||||
// newEDSBalancerImpl create a new edsBalancerImpl.
|
||||
func newEDSBalancerImpl(cc balancer.ClientConn, loadStore lrs.Store) *edsBalancerImpl {
|
||||
edsImpl := &edsBalancerImpl{
|
||||
cc: cc,
|
||||
subBalancerBuilder: balancer.Get(roundrobin.Name),
|
||||
|
||||
@ -104,7 +103,7 @@ func NewXDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) *EDSBalancer {
|
||||
// response. Otherwise the balancer group will be started with round-robin,
|
||||
// and if users specify a different sub-balancer, all balancers in balancer
|
||||
// group will be closed and recreated when sub-balancer update happens.
|
||||
return xdsB
|
||||
return edsImpl
|
||||
}
|
||||
|
||||
// HandleChildPolicy updates the child balancers handling endpoints. Child
|
||||
@ -112,17 +111,17 @@ func NewXDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) *EDSBalancer {
|
||||
// the old child balancer will be used.
|
||||
//
|
||||
// HandleChildPolicy and HandleEDSResponse must be called by the same goroutine.
|
||||
func (xdsB *EDSBalancer) HandleChildPolicy(name string, config json.RawMessage) {
|
||||
if xdsB.subBalancerBuilder.Name() == name {
|
||||
func (edsImpl *edsBalancerImpl) HandleChildPolicy(name string, config json.RawMessage) {
|
||||
if edsImpl.subBalancerBuilder.Name() == name {
|
||||
return
|
||||
}
|
||||
newSubBalancerBuilder := balancer.Get(name)
|
||||
if newSubBalancerBuilder == nil {
|
||||
grpclog.Infof("EDSBalancer: failed to find balancer with name %q, keep using %q", name, xdsB.subBalancerBuilder.Name())
|
||||
grpclog.Infof("edsBalancerImpl: failed to find balancer with name %q, keep using %q", name, edsImpl.subBalancerBuilder.Name())
|
||||
return
|
||||
}
|
||||
xdsB.subBalancerBuilder = newSubBalancerBuilder
|
||||
for _, bgwc := range xdsB.priorityToLocalities {
|
||||
edsImpl.subBalancerBuilder = newSubBalancerBuilder
|
||||
for _, bgwc := range edsImpl.priorityToLocalities {
|
||||
if bgwc == nil {
|
||||
continue
|
||||
}
|
||||
@ -131,7 +130,7 @@ func (xdsB *EDSBalancer) HandleChildPolicy(name string, config json.RawMessage)
|
||||
// switching sub-balancers (keep old balancer around until new
|
||||
// balancer becomes ready).
|
||||
bgwc.bg.remove(id)
|
||||
bgwc.bg.add(id, config.weight, xdsB.subBalancerBuilder)
|
||||
bgwc.bg.add(id, config.weight, edsImpl.subBalancerBuilder)
|
||||
bgwc.bg.handleResolvedAddrs(id, config.addrs)
|
||||
}
|
||||
}
|
||||
@ -139,7 +138,7 @@ func (xdsB *EDSBalancer) HandleChildPolicy(name string, config json.RawMessage)
|
||||
|
||||
// updateDrops compares new drop policies with the old. If they are different,
|
||||
// it updates the drop policies and send ClientConn an updated picker.
|
||||
func (xdsB *EDSBalancer) updateDrops(dropPolicies []xdsclient.OverloadDropConfig) {
|
||||
func (edsImpl *edsBalancerImpl) updateDrops(dropPolicies []xdsclient.OverloadDropConfig) {
|
||||
var (
|
||||
newDrops []*dropper
|
||||
dropsChanged bool
|
||||
@ -151,30 +150,30 @@ func (xdsB *EDSBalancer) updateDrops(dropPolicies []xdsclient.OverloadDropConfig
|
||||
)
|
||||
newDrops = append(newDrops, newDropper(numerator, denominator, dropPolicy.Category))
|
||||
|
||||
// The following reading xdsB.drops doesn't need mutex because it can only
|
||||
// The following reading edsImpl.drops doesn't need mutex because it can only
|
||||
// be updated by the code following.
|
||||
if dropsChanged {
|
||||
continue
|
||||
}
|
||||
if i >= len(xdsB.drops) {
|
||||
if i >= len(edsImpl.drops) {
|
||||
dropsChanged = true
|
||||
continue
|
||||
}
|
||||
if oldDrop := xdsB.drops[i]; numerator != oldDrop.numerator || denominator != oldDrop.denominator {
|
||||
if oldDrop := edsImpl.drops[i]; numerator != oldDrop.numerator || denominator != oldDrop.denominator {
|
||||
dropsChanged = true
|
||||
}
|
||||
}
|
||||
if dropsChanged {
|
||||
xdsB.pickerMu.Lock()
|
||||
xdsB.drops = newDrops
|
||||
if xdsB.innerState.Picker != nil {
|
||||
edsImpl.pickerMu.Lock()
|
||||
edsImpl.drops = newDrops
|
||||
if edsImpl.innerState.Picker != nil {
|
||||
// Update picker with old inner picker, new drops.
|
||||
xdsB.cc.UpdateState(balancer.State{
|
||||
ConnectivityState: xdsB.innerState.ConnectivityState,
|
||||
Picker: newDropPicker(xdsB.innerState.Picker, newDrops, xdsB.loadStore)},
|
||||
edsImpl.cc.UpdateState(balancer.State{
|
||||
ConnectivityState: edsImpl.innerState.ConnectivityState,
|
||||
Picker: newDropPicker(edsImpl.innerState.Picker, newDrops, edsImpl.loadStore)},
|
||||
)
|
||||
}
|
||||
xdsB.pickerMu.Unlock()
|
||||
edsImpl.pickerMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@ -182,7 +181,7 @@ func (xdsB *EDSBalancer) updateDrops(dropPolicies []xdsclient.OverloadDropConfig
|
||||
// SubConns. It also handles drops.
|
||||
//
|
||||
// HandleChildPolicy and HandleEDSResponse must be called by the same goroutine.
|
||||
func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {
|
||||
func (edsImpl *edsBalancerImpl) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {
|
||||
// TODO: Unhandled fields from EDS response:
|
||||
// - edsResp.GetPolicy().GetOverprovisioningFactor()
|
||||
// - locality.GetPriority()
|
||||
@ -192,7 +191,7 @@ func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {
|
||||
// - socketAddress.GetNamedPort(), socketAddress.GetResolverName()
|
||||
// - resolve endpoint's name with another resolver
|
||||
|
||||
xdsB.updateDrops(edsResp.Drops)
|
||||
edsImpl.updateDrops(edsResp.Drops)
|
||||
|
||||
// Filter out all localities with weight 0.
|
||||
//
|
||||
@ -222,7 +221,7 @@ func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {
|
||||
priorityLowest = priority
|
||||
}
|
||||
|
||||
bgwc, ok := xdsB.priorityToLocalities[priority]
|
||||
bgwc, ok := edsImpl.priorityToLocalities[priority]
|
||||
if !ok {
|
||||
// Create balancer group if it's never created (this is the first
|
||||
// time this priority is received). We don't start it here. It may
|
||||
@ -230,24 +229,24 @@ func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {
|
||||
// new lowest priority).
|
||||
bgwc = &balancerGroupWithConfig{
|
||||
bg: newBalancerGroup(
|
||||
xdsB.ccWrapperWithPriority(priority), xdsB.loadStore,
|
||||
edsImpl.ccWrapperWithPriority(priority), edsImpl.loadStore,
|
||||
),
|
||||
configs: make(map[internal.Locality]*localityConfig),
|
||||
}
|
||||
xdsB.priorityToLocalities[priority] = bgwc
|
||||
edsImpl.priorityToLocalities[priority] = bgwc
|
||||
priorityChanged = true
|
||||
}
|
||||
xdsB.handleEDSResponsePerPriority(bgwc, newLocalities)
|
||||
edsImpl.handleEDSResponsePerPriority(bgwc, newLocalities)
|
||||
}
|
||||
xdsB.priorityLowest = priorityLowest
|
||||
edsImpl.priorityLowest = priorityLowest
|
||||
|
||||
// Delete priorities that are removed in the latest response, and also close
|
||||
// the balancer group.
|
||||
for p, bgwc := range xdsB.priorityToLocalities {
|
||||
for p, bgwc := range edsImpl.priorityToLocalities {
|
||||
if _, ok := newLocalitiesWithPriority[p]; !ok {
|
||||
delete(xdsB.priorityToLocalities, p)
|
||||
delete(edsImpl.priorityToLocalities, p)
|
||||
bgwc.bg.close()
|
||||
delete(xdsB.priorityToState, p)
|
||||
delete(edsImpl.priorityToState, p)
|
||||
priorityChanged = true
|
||||
}
|
||||
}
|
||||
@ -256,11 +255,11 @@ func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {
|
||||
// E.g. priorityInUse was removed, or all priorities are down, and a new
|
||||
// lower priority was added.
|
||||
if priorityChanged {
|
||||
xdsB.handlePriorityChange()
|
||||
edsImpl.handlePriorityChange()
|
||||
}
|
||||
}
|
||||
|
||||
func (xdsB *EDSBalancer) handleEDSResponsePerPriority(bgwc *balancerGroupWithConfig, newLocalities []xdsclient.Locality) {
|
||||
func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroupWithConfig, newLocalities []xdsclient.Locality) {
|
||||
// newLocalitiesSet contains all names of localities in the new EDS response
|
||||
// for the same priority. It's used to delete localities that are removed in
|
||||
// the new EDS response.
|
||||
@ -285,7 +284,7 @@ func (xdsB *EDSBalancer) handleEDSResponsePerPriority(bgwc *balancerGroupWithCon
|
||||
address := resolver.Address{
|
||||
Addr: lbEndpoint.Address,
|
||||
}
|
||||
if xdsB.subBalancerBuilder.Name() == weightedroundrobin.Name && lbEndpoint.Weight != 0 {
|
||||
if edsImpl.subBalancerBuilder.Name() == weightedroundrobin.Name && lbEndpoint.Weight != 0 {
|
||||
address.Metadata = &weightedroundrobin.AddrInfo{
|
||||
Weight: lbEndpoint.Weight,
|
||||
}
|
||||
@ -296,7 +295,7 @@ func (xdsB *EDSBalancer) handleEDSResponsePerPriority(bgwc *balancerGroupWithCon
|
||||
config, ok := bgwc.configs[lid]
|
||||
if !ok {
|
||||
// A new balancer, add it to balancer group and balancer map.
|
||||
bgwc.bg.add(lid, newWeight, xdsB.subBalancerBuilder)
|
||||
bgwc.bg.add(lid, newWeight, edsImpl.subBalancerBuilder)
|
||||
config = &localityConfig{
|
||||
weight: newWeight,
|
||||
}
|
||||
@ -336,19 +335,19 @@ func (xdsB *EDSBalancer) handleEDSResponsePerPriority(bgwc *balancerGroupWithCon
|
||||
}
|
||||
|
||||
// HandleSubConnStateChange handles the state change and update pickers accordingly.
|
||||
func (xdsB *EDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
|
||||
xdsB.subConnMu.Lock()
|
||||
func (edsImpl *edsBalancerImpl) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
|
||||
edsImpl.subConnMu.Lock()
|
||||
var bgwc *balancerGroupWithConfig
|
||||
if p, ok := xdsB.subConnToPriority[sc]; ok {
|
||||
if p, ok := edsImpl.subConnToPriority[sc]; ok {
|
||||
if s == connectivity.Shutdown {
|
||||
// Only delete sc from the map when state changed to Shutdown.
|
||||
delete(xdsB.subConnToPriority, sc)
|
||||
delete(edsImpl.subConnToPriority, sc)
|
||||
}
|
||||
bgwc = xdsB.priorityToLocalities[p]
|
||||
bgwc = edsImpl.priorityToLocalities[p]
|
||||
}
|
||||
xdsB.subConnMu.Unlock()
|
||||
edsImpl.subConnMu.Unlock()
|
||||
if bgwc == nil {
|
||||
grpclog.Infof("EDSBalancer: priority not found for sc state change")
|
||||
grpclog.Infof("edsBalancerImpl: priority not found for sc state change")
|
||||
return
|
||||
}
|
||||
if bg := bgwc.bg; bg != nil {
|
||||
@ -358,27 +357,27 @@ func (xdsB *EDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connect
|
||||
|
||||
// updateState first handles priority, and then wraps picker in a drop picker
|
||||
// before forwarding the update.
|
||||
func (xdsB *EDSBalancer) updateState(priority priorityType, s balancer.State) {
|
||||
_, ok := xdsB.priorityToLocalities[priority]
|
||||
func (edsImpl *edsBalancerImpl) updateState(priority priorityType, s balancer.State) {
|
||||
_, ok := edsImpl.priorityToLocalities[priority]
|
||||
if !ok {
|
||||
grpclog.Infof("eds: received picker update from unknown priority")
|
||||
return
|
||||
}
|
||||
|
||||
if xdsB.handlePriorityWithNewState(priority, s) {
|
||||
xdsB.pickerMu.Lock()
|
||||
defer xdsB.pickerMu.Unlock()
|
||||
xdsB.innerState = s
|
||||
if edsImpl.handlePriorityWithNewState(priority, s) {
|
||||
edsImpl.pickerMu.Lock()
|
||||
defer edsImpl.pickerMu.Unlock()
|
||||
edsImpl.innerState = s
|
||||
// Don't reset drops when it's a state change.
|
||||
xdsB.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: newDropPicker(s.Picker, xdsB.drops, xdsB.loadStore)})
|
||||
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: newDropPicker(s.Picker, edsImpl.drops, edsImpl.loadStore)})
|
||||
}
|
||||
}
|
||||
|
||||
func (xdsB *EDSBalancer) ccWrapperWithPriority(priority priorityType) *edsBalancerWrapperCC {
|
||||
func (edsImpl *edsBalancerImpl) ccWrapperWithPriority(priority priorityType) *edsBalancerWrapperCC {
|
||||
return &edsBalancerWrapperCC{
|
||||
ClientConn: xdsB.cc,
|
||||
ClientConn: edsImpl.cc,
|
||||
priority: priority,
|
||||
parent: xdsB,
|
||||
parent: edsImpl,
|
||||
}
|
||||
}
|
||||
|
||||
@ -387,7 +386,7 @@ func (xdsB *EDSBalancer) ccWrapperWithPriority(priority priorityType) *edsBalanc
|
||||
type edsBalancerWrapperCC struct {
|
||||
balancer.ClientConn
|
||||
priority priorityType
|
||||
parent *EDSBalancer
|
||||
parent *edsBalancerImpl
|
||||
}
|
||||
|
||||
func (ebwcc *edsBalancerWrapperCC) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
||||
@ -400,20 +399,20 @@ func (ebwcc *edsBalancerWrapperCC) UpdateState(state balancer.State) {
|
||||
ebwcc.parent.updateState(ebwcc.priority, state)
|
||||
}
|
||||
|
||||
func (xdsB *EDSBalancer) newSubConn(priority priorityType, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
||||
sc, err := xdsB.cc.NewSubConn(addrs, opts)
|
||||
func (edsImpl *edsBalancerImpl) newSubConn(priority priorityType, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
||||
sc, err := edsImpl.cc.NewSubConn(addrs, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
xdsB.subConnMu.Lock()
|
||||
xdsB.subConnToPriority[sc] = priority
|
||||
xdsB.subConnMu.Unlock()
|
||||
edsImpl.subConnMu.Lock()
|
||||
edsImpl.subConnToPriority[sc] = priority
|
||||
edsImpl.subConnMu.Unlock()
|
||||
return sc, nil
|
||||
}
|
||||
|
||||
// Close closes the balancer.
|
||||
func (xdsB *EDSBalancer) Close() {
|
||||
for _, bgwc := range xdsB.priorityToLocalities {
|
||||
func (edsImpl *edsBalancerImpl) Close() {
|
||||
for _, bgwc := range edsImpl.priorityToLocalities {
|
||||
if bg := bgwc.bg; bg != nil {
|
||||
bg.close()
|
||||
}
|
@ -38,28 +38,28 @@ import (
|
||||
// - If priorityInUse has a non-Ready state, and also there's a priority lower
|
||||
// than priorityInUse (which means a lower priority was added), set the next
|
||||
// priority as new priorityInUse, and start the bg.
|
||||
func (xdsB *EDSBalancer) handlePriorityChange() {
|
||||
xdsB.priorityMu.Lock()
|
||||
defer xdsB.priorityMu.Unlock()
|
||||
func (edsImpl *edsBalancerImpl) handlePriorityChange() {
|
||||
edsImpl.priorityMu.Lock()
|
||||
defer edsImpl.priorityMu.Unlock()
|
||||
|
||||
// Everything was removed by EDS.
|
||||
if !xdsB.priorityLowest.isSet() {
|
||||
xdsB.priorityInUse = newPriorityTypeUnset()
|
||||
xdsB.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPickerV2(balancer.ErrTransientFailure)})
|
||||
if !edsImpl.priorityLowest.isSet() {
|
||||
edsImpl.priorityInUse = newPriorityTypeUnset()
|
||||
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPickerV2(balancer.ErrTransientFailure)})
|
||||
return
|
||||
}
|
||||
|
||||
// priorityInUse wasn't set, use 0.
|
||||
if !xdsB.priorityInUse.isSet() {
|
||||
xdsB.startPriority(newPriorityType(0))
|
||||
if !edsImpl.priorityInUse.isSet() {
|
||||
edsImpl.startPriority(newPriorityType(0))
|
||||
return
|
||||
}
|
||||
|
||||
// priorityInUse was deleted, use the new lowest.
|
||||
if _, ok := xdsB.priorityToLocalities[xdsB.priorityInUse]; !ok {
|
||||
xdsB.priorityInUse = xdsB.priorityLowest
|
||||
if s, ok := xdsB.priorityToState[xdsB.priorityLowest]; ok {
|
||||
xdsB.cc.UpdateState(*s)
|
||||
if _, ok := edsImpl.priorityToLocalities[edsImpl.priorityInUse]; !ok {
|
||||
edsImpl.priorityInUse = edsImpl.priorityLowest
|
||||
if s, ok := edsImpl.priorityToState[edsImpl.priorityLowest]; ok {
|
||||
edsImpl.cc.UpdateState(*s)
|
||||
} else {
|
||||
// If state for priorityLowest is not found, this means priorityLowest was
|
||||
// started, but never sent any update. The init timer fired and
|
||||
@ -69,16 +69,16 @@ func (xdsB *EDSBalancer) handlePriorityChange() {
|
||||
// We don't have an old state to send to parent, but we also don't
|
||||
// want parent to keep using picker from old_priorityInUse. Send an
|
||||
// update to trigger block picks until a new picker is ready.
|
||||
xdsB.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPickerV2(balancer.ErrNoSubConnAvailable)})
|
||||
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPickerV2(balancer.ErrNoSubConnAvailable)})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// priorityInUse is not ready, look for next priority, and use if found.
|
||||
if s, ok := xdsB.priorityToState[xdsB.priorityInUse]; ok && s.ConnectivityState != connectivity.Ready {
|
||||
pNext := xdsB.priorityInUse.nextLower()
|
||||
if _, ok := xdsB.priorityToLocalities[pNext]; ok {
|
||||
xdsB.startPriority(pNext)
|
||||
if s, ok := edsImpl.priorityToState[edsImpl.priorityInUse]; ok && s.ConnectivityState != connectivity.Ready {
|
||||
pNext := edsImpl.priorityInUse.nextLower()
|
||||
if _, ok := edsImpl.priorityToLocalities[pNext]; ok {
|
||||
edsImpl.startPriority(pNext)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -86,11 +86,11 @@ func (xdsB *EDSBalancer) handlePriorityChange() {
|
||||
// startPriority sets priorityInUse to p, and starts the balancer group for p.
|
||||
// It also starts a timer to fall to next priority after timeout.
|
||||
//
|
||||
// Caller must hold priorityMu, priority must exist, and xdsB.priorityInUse must
|
||||
// be non-nil.
|
||||
func (xdsB *EDSBalancer) startPriority(priority priorityType) {
|
||||
xdsB.priorityInUse = priority
|
||||
p := xdsB.priorityToLocalities[priority]
|
||||
// Caller must hold priorityMu, priority must exist, and edsImpl.priorityInUse
|
||||
// must be non-nil.
|
||||
func (edsImpl *edsBalancerImpl) startPriority(priority priorityType) {
|
||||
edsImpl.priorityInUse = priority
|
||||
p := edsImpl.priorityToLocalities[priority]
|
||||
// NOTE: this will eventually send addresses to sub-balancers. If the
|
||||
// sub-balancer tries to update picker, it will result in a deadlock on
|
||||
// priorityMu. But it's not an expected behavior for the balancer to
|
||||
@ -103,52 +103,52 @@ func (xdsB *EDSBalancer) startPriority(priority priorityType) {
|
||||
//
|
||||
// In all the cases, the existing init timer is either closed, also already
|
||||
// expired. There's no need to close the old timer.
|
||||
xdsB.priorityInitTimer = time.AfterFunc(defaultPriorityInitTimeout, func() {
|
||||
xdsB.priorityMu.Lock()
|
||||
defer xdsB.priorityMu.Unlock()
|
||||
if !xdsB.priorityInUse.equal(priority) {
|
||||
edsImpl.priorityInitTimer = time.AfterFunc(defaultPriorityInitTimeout, func() {
|
||||
edsImpl.priorityMu.Lock()
|
||||
defer edsImpl.priorityMu.Unlock()
|
||||
if !edsImpl.priorityInUse.equal(priority) {
|
||||
return
|
||||
}
|
||||
xdsB.priorityInitTimer = nil
|
||||
edsImpl.priorityInitTimer = nil
|
||||
pNext := priority.nextLower()
|
||||
if _, ok := xdsB.priorityToLocalities[pNext]; ok {
|
||||
xdsB.startPriority(pNext)
|
||||
if _, ok := edsImpl.priorityToLocalities[pNext]; ok {
|
||||
edsImpl.startPriority(pNext)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// handlePriorityWithNewState start/close priorities based on the connectivity
|
||||
// state. It returns whether the state should be forwarded to parent ClientConn.
|
||||
func (xdsB *EDSBalancer) handlePriorityWithNewState(priority priorityType, s balancer.State) bool {
|
||||
xdsB.priorityMu.Lock()
|
||||
defer xdsB.priorityMu.Unlock()
|
||||
func (edsImpl *edsBalancerImpl) handlePriorityWithNewState(priority priorityType, s balancer.State) bool {
|
||||
edsImpl.priorityMu.Lock()
|
||||
defer edsImpl.priorityMu.Unlock()
|
||||
|
||||
if !xdsB.priorityInUse.isSet() {
|
||||
if !edsImpl.priorityInUse.isSet() {
|
||||
grpclog.Infof("eds: received picker update when no priority is in use (EDS returned an empty list)")
|
||||
return false
|
||||
}
|
||||
|
||||
if xdsB.priorityInUse.higherThan(priority) {
|
||||
if edsImpl.priorityInUse.higherThan(priority) {
|
||||
// Lower priorities should all be closed, this is an unexpected update.
|
||||
grpclog.Infof("eds: received picker update from priority lower then priorityInUse")
|
||||
return false
|
||||
}
|
||||
|
||||
bState, ok := xdsB.priorityToState[priority]
|
||||
bState, ok := edsImpl.priorityToState[priority]
|
||||
if !ok {
|
||||
bState = &balancer.State{}
|
||||
xdsB.priorityToState[priority] = bState
|
||||
edsImpl.priorityToState[priority] = bState
|
||||
}
|
||||
oldState := bState.ConnectivityState
|
||||
*bState = s
|
||||
|
||||
switch s.ConnectivityState {
|
||||
case connectivity.Ready:
|
||||
return xdsB.handlePriorityWithNewStateReady(priority)
|
||||
return edsImpl.handlePriorityWithNewStateReady(priority)
|
||||
case connectivity.TransientFailure:
|
||||
return xdsB.handlePriorityWithNewStateTransientFailure(priority)
|
||||
return edsImpl.handlePriorityWithNewStateTransientFailure(priority)
|
||||
case connectivity.Connecting:
|
||||
return xdsB.handlePriorityWithNewStateConnecting(priority, oldState)
|
||||
return edsImpl.handlePriorityWithNewStateConnecting(priority, oldState)
|
||||
default:
|
||||
// New state is Idle, should never happen. Don't forward.
|
||||
return false
|
||||
@ -169,19 +169,19 @@ func (xdsB *EDSBalancer) handlePriorityWithNewState(priority priorityType, s bal
|
||||
// Caller must make sure priorityInUse is not higher than priority.
|
||||
//
|
||||
// Caller must hold priorityMu.
|
||||
func (xdsB *EDSBalancer) handlePriorityWithNewStateReady(priority priorityType) bool {
|
||||
func (edsImpl *edsBalancerImpl) handlePriorityWithNewStateReady(priority priorityType) bool {
|
||||
// If one priority higher or equal to priorityInUse goes Ready, stop the
|
||||
// init timer. If update is from higher than priorityInUse,
|
||||
// priorityInUse will be closed, and the init timer will become useless.
|
||||
if timer := xdsB.priorityInitTimer; timer != nil {
|
||||
if timer := edsImpl.priorityInitTimer; timer != nil {
|
||||
timer.Stop()
|
||||
xdsB.priorityInitTimer = nil
|
||||
edsImpl.priorityInitTimer = nil
|
||||
}
|
||||
|
||||
if xdsB.priorityInUse.lowerThan(priority) {
|
||||
xdsB.priorityInUse = priority
|
||||
for i := priority.nextLower(); !i.lowerThan(xdsB.priorityLowest); i = i.nextLower() {
|
||||
xdsB.priorityToLocalities[i].bg.close()
|
||||
if edsImpl.priorityInUse.lowerThan(priority) {
|
||||
edsImpl.priorityInUse = priority
|
||||
for i := priority.nextLower(); !i.lowerThan(edsImpl.priorityLowest); i = i.nextLower() {
|
||||
edsImpl.priorityToLocalities[i].bg.close()
|
||||
}
|
||||
return true
|
||||
}
|
||||
@ -205,20 +205,20 @@ func (xdsB *EDSBalancer) handlePriorityWithNewStateReady(priority priorityType)
|
||||
// Caller must make sure priorityInUse is not higher than priority.
|
||||
//
|
||||
// Caller must hold priorityMu.
|
||||
func (xdsB *EDSBalancer) handlePriorityWithNewStateTransientFailure(priority priorityType) bool {
|
||||
if xdsB.priorityInUse.lowerThan(priority) {
|
||||
func (edsImpl *edsBalancerImpl) handlePriorityWithNewStateTransientFailure(priority priorityType) bool {
|
||||
if edsImpl.priorityInUse.lowerThan(priority) {
|
||||
return false
|
||||
}
|
||||
// priorityInUse sends a failure. Stop its init timer.
|
||||
if timer := xdsB.priorityInitTimer; timer != nil {
|
||||
if timer := edsImpl.priorityInitTimer; timer != nil {
|
||||
timer.Stop()
|
||||
xdsB.priorityInitTimer = nil
|
||||
edsImpl.priorityInitTimer = nil
|
||||
}
|
||||
pNext := priority.nextLower()
|
||||
if _, okNext := xdsB.priorityToLocalities[pNext]; !okNext {
|
||||
if _, okNext := edsImpl.priorityToLocalities[pNext]; !okNext {
|
||||
return true
|
||||
}
|
||||
xdsB.startPriority(pNext)
|
||||
edsImpl.startPriority(pNext)
|
||||
return true
|
||||
}
|
||||
|
||||
@ -247,18 +247,18 @@ func (xdsB *EDSBalancer) handlePriorityWithNewStateTransientFailure(priority pri
|
||||
// Caller must make sure priorityInUse is not higher than priority.
|
||||
//
|
||||
// Caller must hold priorityMu.
|
||||
func (xdsB *EDSBalancer) handlePriorityWithNewStateConnecting(priority priorityType, oldState connectivity.State) bool {
|
||||
if xdsB.priorityInUse.lowerThan(priority) {
|
||||
func (edsImpl *edsBalancerImpl) handlePriorityWithNewStateConnecting(priority priorityType, oldState connectivity.State) bool {
|
||||
if edsImpl.priorityInUse.lowerThan(priority) {
|
||||
return false
|
||||
}
|
||||
|
||||
switch oldState {
|
||||
case connectivity.Ready:
|
||||
pNext := priority.nextLower()
|
||||
if _, okNext := xdsB.priorityToLocalities[pNext]; !okNext {
|
||||
if _, okNext := edsImpl.priorityToLocalities[pNext]; !okNext {
|
||||
return true
|
||||
}
|
||||
xdsB.startPriority(pNext)
|
||||
edsImpl.startPriority(pNext)
|
||||
return true
|
||||
case connectivity.Idle:
|
||||
return true
|
@ -33,7 +33,7 @@ import (
|
||||
// Init 0 and 1; 0 is up, use 0; add 2, use 0; remove 2, use 0.
|
||||
func TestEDSPriority_HighPriorityReady(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := NewXDSBalancer(cc, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil)
|
||||
|
||||
// Two localities, with priorities [0, 1], each with one backend.
|
||||
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
@ -99,7 +99,7 @@ func TestEDSPriority_HighPriorityReady(t *testing.T) {
|
||||
// down, use 2; remove 2, use 1.
|
||||
func TestEDSPriority_SwitchPriority(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := NewXDSBalancer(cc, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil)
|
||||
|
||||
// Two localities, with priorities [0, 1], each with one backend.
|
||||
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
@ -206,7 +206,7 @@ func TestEDSPriority_SwitchPriority(t *testing.T) {
|
||||
// Init 0 and 1; 0 and 1 both down; add 2, use 2.
|
||||
func TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := NewXDSBalancer(cc, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil)
|
||||
|
||||
// Two localities, with different priorities, each with one backend.
|
||||
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
@ -270,7 +270,7 @@ func TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) {
|
||||
defer time.Sleep(10 * time.Millisecond)
|
||||
|
||||
cc := newTestClientConn(t)
|
||||
edsb := NewXDSBalancer(cc, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil)
|
||||
|
||||
// Two localities, with priorities [0,1,2], each with one backend.
|
||||
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
@ -349,7 +349,7 @@ func TestEDSPriority_InitTimeout(t *testing.T) {
|
||||
}()()
|
||||
|
||||
cc := newTestClientConn(t)
|
||||
edsb := NewXDSBalancer(cc, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil)
|
||||
|
||||
// Two localities, with different priorities, each with one backend.
|
||||
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
@ -398,7 +398,7 @@ func TestEDSPriority_InitTimeout(t *testing.T) {
|
||||
// - add localities to existing p0 and p1
|
||||
func TestEDSPriority_MultipleLocalities(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := NewXDSBalancer(cc, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil)
|
||||
|
||||
// Two localities, with different priorities, each with one backend.
|
||||
clab0 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
@ -510,7 +510,7 @@ func TestEDSPriority_RemovesAllLocalities(t *testing.T) {
|
||||
}()()
|
||||
|
||||
cc := newTestClientConn(t)
|
||||
edsb := NewXDSBalancer(cc, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil)
|
||||
|
||||
// Two localities, with different priorities, each with one backend.
|
||||
clab0 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
@ -52,7 +52,7 @@ func init() {
|
||||
// - change drop rate
|
||||
func TestEDS_OneLocality(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := NewXDSBalancer(cc, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil)
|
||||
|
||||
// One locality with one backend.
|
||||
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
@ -158,7 +158,7 @@ func TestEDS_OneLocality(t *testing.T) {
|
||||
// - update locality weight
|
||||
func TestEDS_TwoLocalities(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := NewXDSBalancer(cc, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil)
|
||||
|
||||
// Two localities, each with one backend.
|
||||
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
@ -288,7 +288,7 @@ func TestEDS_TwoLocalities(t *testing.T) {
|
||||
// healthy ones are used.
|
||||
func TestEDS_EndpointsHealth(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := NewXDSBalancer(cc, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil)
|
||||
|
||||
// Two localities, each 3 backend, one Healthy, one Unhealthy, one Unknown.
|
||||
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
@ -359,7 +359,7 @@ func TestEDS_EndpointsHealth(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestClose(t *testing.T) {
|
||||
edsb := NewXDSBalancer(nil, nil)
|
||||
edsb := newEDSBalancerImpl(nil, nil)
|
||||
// This is what could happen when switching between fallback and eds. This
|
||||
// make sure it doesn't panic.
|
||||
edsb.Close()
|
||||
@ -416,7 +416,7 @@ func (tcp *testConstPicker) Pick(info balancer.PickInfo) (balancer.PickResult, e
|
||||
// eds response.
|
||||
func TestEDS_UpdateSubBalancerName(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := NewXDSBalancer(cc, nil)
|
||||
edsb := newEDSBalancerImpl(cc, nil)
|
||||
|
||||
t.Logf("update sub-balancer to test-const-balancer")
|
||||
edsb.HandleChildPolicy("test-const-balancer", nil)
|
||||
@ -575,7 +575,7 @@ func TestEDS_LoadReport(t *testing.T) {
|
||||
testLoadStore := newTestLoadStore()
|
||||
|
||||
cc := newTestClientConn(t)
|
||||
edsb := NewXDSBalancer(cc, testLoadStore)
|
||||
edsb := newEDSBalancerImpl(cc, testLoadStore)
|
||||
|
||||
backendToBalancerID := make(map[balancer.SubConn]internal.Locality)
|
||||
|
@ -16,7 +16,7 @@
|
||||
*
|
||||
*/
|
||||
|
||||
package balancer
|
||||
package edsbalancer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
@ -68,21 +68,21 @@ func Test(t *testing.T) {
|
||||
|
||||
const testBalancerNameFooBar = "foo.bar"
|
||||
|
||||
func newTestClientConn() *testClientConn {
|
||||
return &testClientConn{newSubConns: testutils.NewChannelWithSize(10)}
|
||||
func newNoopTestClientConn() *noopTestClientConn {
|
||||
return &noopTestClientConn{}
|
||||
}
|
||||
|
||||
type testClientConn struct {
|
||||
// noopTestClientConn is used in EDS balancer config update tests that only
|
||||
// cover the config update handling, but not SubConn/load-balancing.
|
||||
type noopTestClientConn struct {
|
||||
balancer.ClientConn
|
||||
newSubConns *testutils.Channel
|
||||
}
|
||||
|
||||
func (t *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
||||
t.newSubConns.Send(addrs)
|
||||
func (t *noopTestClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (testClientConn) Target() string { return testServiceName }
|
||||
func (noopTestClientConn) Target() string { return testServiceName }
|
||||
|
||||
type scStateChange struct {
|
||||
sc balancer.SubConn
|
||||
@ -131,7 +131,7 @@ func (f *fakeEDSBalancer) waitForSubConnStateChange(wantState *scStateChange) er
|
||||
return nil
|
||||
}
|
||||
|
||||
func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface {
|
||||
func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerImplInterface {
|
||||
return &fakeEDSBalancer{
|
||||
cc: cc,
|
||||
childPolicy: testutils.NewChannelWithSize(10),
|
||||
@ -188,7 +188,7 @@ func waitForNewEDSLB(t *testing.T, ch *testutils.Channel) *fakeEDSBalancer {
|
||||
// cleanup.
|
||||
func setup(edsLBCh *testutils.Channel, xdsClientCh *testutils.Channel) func() {
|
||||
origNewEDSBalancer := newEDSBalancer
|
||||
newEDSBalancer = func(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface {
|
||||
newEDSBalancer = func(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerImplInterface {
|
||||
edsLB := newFakeEDSBalancer(cc, loadStore)
|
||||
defer func() { edsLBCh.Send(edsLB) }()
|
||||
return edsLB
|
||||
@ -221,7 +221,7 @@ func (s) TestXDSConfigBalancerNameUpdate(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
builder := balancer.Get(edsName)
|
||||
cc := newTestClientConn()
|
||||
cc := newNoopTestClientConn()
|
||||
edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
|
||||
if !ok {
|
||||
t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
|
||||
@ -233,7 +233,7 @@ func (s) TestXDSConfigBalancerNameUpdate(t *testing.T) {
|
||||
balancerName := fmt.Sprintf("balancer-%d", i)
|
||||
edsB.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: resolver.State{Addresses: addrs},
|
||||
BalancerConfig: &XDSConfig{
|
||||
BalancerConfig: &EDSConfig{
|
||||
BalancerName: balancerName,
|
||||
EDSServiceName: testEDSClusterName,
|
||||
},
|
||||
@ -303,7 +303,7 @@ func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
builder := balancer.Get(edsName)
|
||||
cc := newTestClientConn()
|
||||
cc := newNoopTestClientConn()
|
||||
edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}).(*edsBalancer)
|
||||
if !ok {
|
||||
t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
|
||||
@ -311,7 +311,7 @@ func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) {
|
||||
defer edsB.Close()
|
||||
|
||||
edsB.UpdateClientConnState(balancer.ClientConnState{
|
||||
BalancerConfig: &XDSConfig{
|
||||
BalancerConfig: &EDSConfig{
|
||||
BalancerName: testBalancerNameFooBar,
|
||||
ChildPolicy: &loadBalancingConfig{
|
||||
Name: fakeBalancerA,
|
||||
@ -329,7 +329,7 @@ func (s) TestXDSConnfigChildPolicyUpdate(t *testing.T) {
|
||||
})
|
||||
|
||||
edsB.UpdateClientConnState(balancer.ClientConnState{
|
||||
BalancerConfig: &XDSConfig{
|
||||
BalancerConfig: &EDSConfig{
|
||||
BalancerName: testBalancerNameFooBar,
|
||||
ChildPolicy: &loadBalancingConfig{
|
||||
Name: fakeBalancerB,
|
||||
@ -353,7 +353,7 @@ func (s) TestXDSSubConnStateChange(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
builder := balancer.Get(edsName)
|
||||
cc := newTestClientConn()
|
||||
cc := newNoopTestClientConn()
|
||||
edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
|
||||
if !ok {
|
||||
t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
|
||||
@ -363,7 +363,7 @@ func (s) TestXDSSubConnStateChange(t *testing.T) {
|
||||
addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
|
||||
edsB.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: resolver.State{Addresses: addrs},
|
||||
BalancerConfig: &XDSConfig{
|
||||
BalancerConfig: &EDSConfig{
|
||||
BalancerName: testBalancerNameFooBar,
|
||||
EDSServiceName: testEDSClusterName,
|
||||
},
|
||||
@ -411,7 +411,7 @@ func TestXDSBalancerConfigParsing(t *testing.T) {
|
||||
{
|
||||
name: "jsonpb-generated",
|
||||
js: b.Bytes(),
|
||||
want: &XDSConfig{
|
||||
want: &EDSConfig{
|
||||
ChildPolicy: &loadBalancingConfig{
|
||||
Name: "round_robin",
|
||||
Config: json.RawMessage("{}"),
|
||||
@ -444,7 +444,7 @@ func TestXDSBalancerConfigParsing(t *testing.T) {
|
||||
"edsServiceName": "eds.service",
|
||||
"lrsLoadReportingServerName": "lrs.server"
|
||||
}`),
|
||||
want: &XDSConfig{
|
||||
want: &EDSConfig{
|
||||
BalancerName: "fake.foo.bar",
|
||||
ChildPolicy: &loadBalancingConfig{
|
||||
Name: "fake_balancer_A",
|
||||
@ -468,7 +468,7 @@ func TestXDSBalancerConfigParsing(t *testing.T) {
|
||||
"balancerName": "fake.foo.bar",
|
||||
"edsServiceName": "eds.service"
|
||||
}`),
|
||||
want: &XDSConfig{
|
||||
want: &EDSConfig{
|
||||
BalancerName: "fake.foo.bar",
|
||||
EDSServiceName: testEDSName,
|
||||
LrsLoadReportingServerName: nil,
|
||||
@ -494,17 +494,17 @@ func TestLoadbalancingConfigParsing(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
s string
|
||||
want *XDSConfig
|
||||
want *EDSConfig
|
||||
}{
|
||||
{
|
||||
name: "empty",
|
||||
s: "{}",
|
||||
want: &XDSConfig{},
|
||||
want: &EDSConfig{},
|
||||
},
|
||||
{
|
||||
name: "success1",
|
||||
s: `{"childPolicy":[{"pick_first":{}}]}`,
|
||||
want: &XDSConfig{
|
||||
want: &EDSConfig{
|
||||
ChildPolicy: &loadBalancingConfig{
|
||||
Name: "pick_first",
|
||||
Config: json.RawMessage(`{}`),
|
||||
@ -514,7 +514,7 @@ func TestLoadbalancingConfigParsing(t *testing.T) {
|
||||
{
|
||||
name: "success2",
|
||||
s: `{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`,
|
||||
want: &XDSConfig{
|
||||
want: &EDSConfig{
|
||||
ChildPolicy: &loadBalancingConfig{
|
||||
Name: "round_robin",
|
||||
Config: json.RawMessage(`{}`),
|
||||
@ -524,7 +524,7 @@ func TestLoadbalancingConfigParsing(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
var cfg XDSConfig
|
||||
var cfg EDSConfig
|
||||
if err := json.Unmarshal([]byte(tt.s), &cfg); err != nil || !reflect.DeepEqual(&cfg, tt.want) {
|
||||
t.Errorf("test name: %s, parseFullServiceConfig() = %+v, err: %v, want %+v, <nil>", tt.name, cfg, err, tt.want)
|
||||
}
|
@ -16,7 +16,7 @@
|
||||
*
|
||||
*/
|
||||
|
||||
package balancer
|
||||
package edsbalancer
|
||||
|
||||
import (
|
||||
"google.golang.org/grpc"
|
||||
@ -122,7 +122,7 @@ func (c *xdsclientWrapper) replaceXDSClient(newClient xdsClientInterface, newBal
|
||||
// the balancerName (from bootstrap file or from service config) changed.
|
||||
// - if balancer names are the same, do nothing, and return false
|
||||
// - if balancer names are different, create new one, and return true
|
||||
func (c *xdsclientWrapper) updateXDSClient(config *XDSConfig, attr *attributes.Attributes) bool {
|
||||
func (c *xdsclientWrapper) updateXDSClient(config *EDSConfig, attr *attributes.Attributes) bool {
|
||||
if attr != nil {
|
||||
if clientFromAttr, _ := attr.Value(xdsinternal.XDSClientID).(xdsClientInterface); clientFromAttr != nil {
|
||||
// This will also clear balancerName, to indicate that client is
|
||||
@ -217,7 +217,7 @@ func (c *xdsclientWrapper) startLoadReport(edsServiceNameBeingWatched string, lo
|
||||
|
||||
// handleUpdate applies the service config and attributes updates to the client,
|
||||
// including updating the xds_client to use, and updating the EDS name to watch.
|
||||
func (c *xdsclientWrapper) handleUpdate(config *XDSConfig, attr *attributes.Attributes) {
|
||||
func (c *xdsclientWrapper) handleUpdate(config *EDSConfig, attr *attributes.Attributes) {
|
||||
clientChanged := c.updateXDSClient(config, attr)
|
||||
|
||||
var (
|
@ -16,7 +16,7 @@
|
||||
*
|
||||
*/
|
||||
|
||||
package balancer
|
||||
package edsbalancer
|
||||
|
||||
import (
|
||||
"errors"
|
||||
@ -93,7 +93,7 @@ func (s) TestClientWrapperWatchEDS(t *testing.T) {
|
||||
},
|
||||
} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
cw.handleUpdate(&XDSConfig{
|
||||
cw.handleUpdate(&EDSConfig{
|
||||
BalancerName: fakeServer.Address,
|
||||
EDSServiceName: test.edsServiceName,
|
||||
}, nil)
|
||||
@ -140,7 +140,7 @@ func (s) TestClientWrapperHandleUpdateError(t *testing.T) {
|
||||
defer cw.close()
|
||||
|
||||
xdsC := fakeclient.NewClient()
|
||||
cw.handleUpdate(&XDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC))
|
||||
cw.handleUpdate(&EDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC))
|
||||
gotCluster, err := xdsC.WaitForWatchEDS()
|
||||
if err != nil {
|
||||
t.Fatalf("xdsClient.WatchEDS failed with error: %v", err)
|
||||
@ -175,7 +175,7 @@ func (s) TestClientWrapperGetsXDSClientInAttributes(t *testing.T) {
|
||||
|
||||
// Verify that the eds watch is registered for the expected resource name.
|
||||
xdsC1 := fakeclient.NewClient()
|
||||
cw.handleUpdate(&XDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC1))
|
||||
cw.handleUpdate(&EDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC1))
|
||||
gotCluster, err := xdsC1.WaitForWatchEDS()
|
||||
if err != nil {
|
||||
t.Fatalf("xdsClient.WatchEDS failed with error: %v", err)
|
||||
@ -189,7 +189,7 @@ func (s) TestClientWrapperGetsXDSClientInAttributes(t *testing.T) {
|
||||
// (because clientWrapper only closes clients that it creates, it does not
|
||||
// close client that are passed through attributes).
|
||||
xdsC2 := fakeclient.NewClient()
|
||||
cw.handleUpdate(&XDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC2))
|
||||
cw.handleUpdate(&EDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC2))
|
||||
gotCluster, err = xdsC2.WaitForWatchEDS()
|
||||
if err != nil {
|
||||
t.Fatalf("xdsClient.WatchEDS failed with error: %v", err)
|
@ -16,7 +16,7 @@
|
||||
*
|
||||
*/
|
||||
|
||||
package balancer
|
||||
package edsbalancer
|
||||
|
||||
import (
|
||||
"testing"
|
||||
@ -33,7 +33,7 @@ import (
|
||||
// server (empty string).
|
||||
func (s) TestXDSLoadReporting(t *testing.T) {
|
||||
builder := balancer.Get(edsName)
|
||||
cc := newTestClientConn()
|
||||
cc := newNoopTestClientConn()
|
||||
edsB, ok := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSClusterName}}).(*edsBalancer)
|
||||
if !ok {
|
||||
t.Fatalf("builder.Build(%s) returned type {%T}, want {*edsBalancer}", edsName, edsB)
|
||||
@ -43,7 +43,7 @@ func (s) TestXDSLoadReporting(t *testing.T) {
|
||||
xdsC := fakeclient.NewClient()
|
||||
edsB.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, xdsC)},
|
||||
BalancerConfig: &XDSConfig{LrsLoadReportingServerName: new(string)},
|
||||
BalancerConfig: &EDSConfig{LrsLoadReportingServerName: new(string)},
|
||||
})
|
||||
|
||||
gotCluster, err := xdsC.WaitForWatchEDS()
|
@ -15,7 +15,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package balancer
|
||||
package edsbalancer
|
||||
|
||||
import "google.golang.org/grpc/balancer"
|
||||
|
Reference in New Issue
Block a user