grpc: perform graceful switching of LB policies in the ClientConn by default (#5285)

This commit is contained in:
Easwar Swaminathan
2022-04-01 13:14:35 -07:00
committed by GitHub
parent 3cccf6a43b
commit 0066bf69de
8 changed files with 441 additions and 426 deletions

View File

@ -20,136 +20,178 @@ package grpc
import (
"fmt"
"strings"
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
)
// scStateUpdate contains the subConn and the new state it changed to.
// ccBalancerWrapper sits between the ClientConn and the Balancer.
//
// ccBalancerWrapper implements methods corresponding to the ones on the
// balancer.Balancer interface. The ClientConn is free to call these methods
// concurrently and the ccBalancerWrapper ensures that calls from the ClientConn
// to the Balancer happen synchronously and in order.
//
// ccBalancerWrapper also implements the balancer.ClientConn interface and is
// passed to the Balancer implementations. It invokes unexported methods on the
// ClientConn to handle these calls from the Balancer.
//
// It uses the gracefulswitch.Balancer internally to ensure that balancer
// switches happen in a graceful manner.
type ccBalancerWrapper struct {
cc *ClientConn
// Since these fields are accessed only from handleXxx() methods which are
// synchronized by the watcher goroutine, we do not need a mutex to protect
// these fields.
balancer *gracefulswitch.Balancer
curBalancerName string
updateCh *buffer.Unbounded // Updates written on this channel are processed by watcher().
resultCh *buffer.Unbounded // Results of calls to UpdateClientConnState() are pushed here.
closed *grpcsync.Event // Indicates if close has been called.
done *grpcsync.Event // Indicates if close has completed its work.
}
// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer
// is not created until the switchTo() method is invoked.
func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper {
ccb := &ccBalancerWrapper{
cc: cc,
updateCh: buffer.NewUnbounded(),
resultCh: buffer.NewUnbounded(),
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
}
go ccb.watcher()
ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)
return ccb
}
// The following xxxUpdate structs wrap the arguments received as part of the
// corresponding update. The watcher goroutine uses the 'type' of the update to
// invoke the appropriate handler routine to handle the update.
type ccStateUpdate struct {
ccs *balancer.ClientConnState
}
type scStateUpdate struct {
sc balancer.SubConn
state connectivity.State
err error
}
// exitIdle contains no data and is just a signal sent on the updateCh in
// ccBalancerWrapper to instruct the balancer to exit idle.
type exitIdle struct{}
type exitIdleUpdate struct{}
// ccBalancerWrapper is a wrapper on top of cc for balancers.
// It implements balancer.ClientConn interface.
type ccBalancerWrapper struct {
cc *ClientConn
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
hasExitIdle bool
updateCh *buffer.Unbounded
closed *grpcsync.Event
done *grpcsync.Event
mu sync.Mutex
subConns map[*acBalancerWrapper]struct{}
type resolverErrorUpdate struct {
err error
}
func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
ccb := &ccBalancerWrapper{
cc: cc,
updateCh: buffer.NewUnbounded(),
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
subConns: make(map[*acBalancerWrapper]struct{}),
}
go ccb.watcher()
ccb.balancer = b.Build(ccb, bopts)
_, ccb.hasExitIdle = ccb.balancer.(balancer.ExitIdler)
return ccb
type switchToUpdate struct {
name string
}
// watcher balancer functions sequentially, so the balancer can be implemented
// lock-free.
type subConnUpdate struct {
acbw *acBalancerWrapper
}
// watcher is a long-running goroutine which reads updates from a channel and
// invokes corresponding methods on the underlying balancer. It ensures that
// these methods are invoked in a synchronous fashion. It also ensures that
// these methods are invoked in the order in which the updates were received.
func (ccb *ccBalancerWrapper) watcher() {
for {
select {
case t := <-ccb.updateCh.Get():
case u := <-ccb.updateCh.Get():
ccb.updateCh.Load()
if ccb.closed.HasFired() {
break
}
switch u := t.(type) {
switch update := u.(type) {
case *ccStateUpdate:
ccb.handleClientConnStateChange(update.ccs)
case *scStateUpdate:
ccb.balancerMu.Lock()
ccb.balancer.UpdateSubConnState(u.sc, balancer.SubConnState{ConnectivityState: u.state, ConnectionError: u.err})
ccb.balancerMu.Unlock()
case *acBalancerWrapper:
ccb.mu.Lock()
if ccb.subConns != nil {
delete(ccb.subConns, u)
ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain)
}
ccb.mu.Unlock()
case exitIdle:
if ccb.cc.GetState() == connectivity.Idle {
if ei, ok := ccb.balancer.(balancer.ExitIdler); ok {
// We already checked that the balancer implements
// ExitIdle before pushing the event to updateCh, but
// check conditionally again as defensive programming.
ccb.balancerMu.Lock()
ei.ExitIdle()
ccb.balancerMu.Unlock()
}
}
ccb.handleSubConnStateChange(update)
case *exitIdleUpdate:
ccb.handleExitIdle()
case *resolverErrorUpdate:
ccb.handleResolverError(update.err)
case *switchToUpdate:
ccb.handleSwitchTo(update.name)
case *subConnUpdate:
ccb.handleRemoveSubConn(update.acbw)
default:
logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t)
logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", update, update)
}
case <-ccb.closed.Done():
}
if ccb.closed.HasFired() {
ccb.balancerMu.Lock()
ccb.balancer.Close()
ccb.balancerMu.Unlock()
ccb.mu.Lock()
scs := ccb.subConns
ccb.subConns = nil
ccb.mu.Unlock()
ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil})
ccb.done.Fire()
// Fire done before removing the addr conns. We can safely unblock
// ccb.close and allow the removeAddrConns to happen
// asynchronously.
for acbw := range scs {
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
}
ccb.handleClose()
return
}
}
}
func (ccb *ccBalancerWrapper) close() {
if ccb == nil {
return
// updateClientConnState is invoked by grpc to push a ClientConnState update to
// the underlying balancer.
//
// Unlike other methods invoked by grpc to push updates to the underlying
// balancer, this method cannot simply push the update onto the update channel
// and return. It needs to return the error returned by the underlying balancer
// back to grpc which propagates that to the resolver.
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
ccb.updateCh.Put(&ccStateUpdate{ccs: ccs})
var res interface{}
select {
case res = <-ccb.resultCh.Get():
ccb.resultCh.Load()
case <-ccb.closed.Done():
// Return early if the balancer wrapper is closed while we are waiting for
// the underlying balancer to process a ClientConnState update.
return nil
}
ccb.closed.Fire()
<-ccb.done.Done()
// If the returned error is nil, attempting to type assert to error leads to
// panic. So, this needs to handled separately.
if res == nil {
return nil
}
return res.(error)
}
func (ccb *ccBalancerWrapper) exitIdle() bool {
if ccb == nil {
return true
// handleClientConnStateChange handles a ClientConnState update from the update
// channel and invokes the appropriate method on the underlying balancer.
//
// If the addresses specified in the update contain addresses of type "grpclb"
// and the selected LB policy is not "grpclb", these addresses will be filtered
// out and ccs will be modified with the updated address list.
func (ccb *ccBalancerWrapper) handleClientConnStateChange(ccs *balancer.ClientConnState) {
if ccb.curBalancerName != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
var addrs []resolver.Address
for _, addr := range ccs.ResolverState.Addresses {
if addr.Type == resolver.GRPCLB {
continue
}
addrs = append(addrs, addr)
}
ccs.ResolverState.Addresses = addrs
}
if !ccb.hasExitIdle {
return false
}
ccb.updateCh.Put(exitIdle{})
return true
ccb.resultCh.Put(ccb.balancer.UpdateClientConnState(*ccs))
}
func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
// updateSubConnState is invoked by grpc to push a subConn state update to the
// underlying balancer.
func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
// When updating addresses for a SubConn, if the address in use is not in
// the new addresses, the old ac will be tearDown() and a new ac will be
// created. tearDown() generates a state change with Shutdown state, we
@ -167,39 +209,97 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
})
}
// updateClientConnState forwards the clientConn update to the wrapped balancer
// synchronously.
//
// Other calls from the channel like exitIdle() and handleSubConnStateChange()
// are handled asynchronously by pushing the update onto a channel, which is
// picked up by the watcher() goroutine and forwarded to the wrapped balancer.
// That approach cannot be taken here because the corresponding API on the
// balancer returns an error which needs to be sent back to the channel to be
// forward to the resolver.
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
ccb.balancerMu.Lock()
defer ccb.balancerMu.Unlock()
return ccb.balancer.UpdateClientConnState(*ccs)
// handleSubConnStateChange handles a SubConnState update from the update
// channel and invokes the appropriate method on the underlying balancer.
func (ccb *ccBalancerWrapper) handleSubConnStateChange(update *scStateUpdate) {
ccb.balancer.UpdateSubConnState(update.sc, balancer.SubConnState{ConnectivityState: update.state, ConnectionError: update.err})
}
func (ccb *ccBalancerWrapper) exitIdle() {
ccb.updateCh.Put(&exitIdleUpdate{})
}
func (ccb *ccBalancerWrapper) handleExitIdle() {
if ccb.cc.GetState() != connectivity.Idle {
return
}
ccb.balancer.ExitIdle()
}
func (ccb *ccBalancerWrapper) resolverError(err error) {
if ccb == nil {
ccb.updateCh.Put(&resolverErrorUpdate{err: err})
}
func (ccb *ccBalancerWrapper) handleResolverError(err error) {
ccb.balancer.ResolverError(err)
}
// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the
// LB policy identified by name.
//
// ClientConn calls newCCBalancerWrapper() at creation time. Upon receipt of the
// first good update from the name resolver, it determines the LB policy to use
// and invokes the switchTo() method. Upon receipt of every subsequent update
// from the name resolver, it invokes this method.
//
// the ccBalancerWrapper keeps track of the current LB policy name, and skips
// the graceful balancer switching process if the name does not change.
func (ccb *ccBalancerWrapper) switchTo(name string) {
ccb.updateCh.Put(&switchToUpdate{name: name})
}
// handleSwitchTo handles a balancer switch update from the update channel. It
// calls the SwitchTo() method on the gracefulswitch.Balancer with a
// balancer.Builder corresponding to name. If no balancer.Builder is registered
// for the given name, it uses the default LB policy which is "pick_first".
func (ccb *ccBalancerWrapper) handleSwitchTo(name string) {
// TODO: Other languages use case-insensitive balancer registries. We should
// switch as well. See: https://github.com/grpc/grpc-go/issues/5288.
if strings.EqualFold(ccb.curBalancerName, name) {
return
}
ccb.balancerMu.Lock()
defer ccb.balancerMu.Unlock()
ccb.balancer.ResolverError(err)
// TODO: Ensure that name is a registered LB policy when we get here.
// We currently only validate the `loadBalancingConfig` field. We need to do
// the same for the `loadBalancingPolicy` field and reject the service config
// if the specified policy is not registered.
builder := balancer.Get(name)
if builder == nil {
channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
builder = newPickfirstBuilder()
} else {
channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name)
}
if err := ccb.balancer.SwitchTo(builder); err != nil {
channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err)
return
}
ccb.curBalancerName = builder.Name()
}
// handleRemoveSucConn handles a request from the underlying balancer to remove
// a subConn.
//
// See comments in RemoveSubConn() for more details.
func (ccb *ccBalancerWrapper) handleRemoveSubConn(acbw *acBalancerWrapper) {
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
}
func (ccb *ccBalancerWrapper) close() {
ccb.closed.Fire()
<-ccb.done.Done()
}
func (ccb *ccBalancerWrapper) handleClose() {
ccb.balancer.Close()
ccb.done.Fire()
}
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
if len(addrs) <= 0 {
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
}
ccb.mu.Lock()
defer ccb.mu.Unlock()
if ccb.subConns == nil {
return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
}
ac, err := ccb.cc.newAddrConn(addrs, opts)
if err != nil {
channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
@ -209,15 +309,25 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
acbw.ac.mu.Lock()
ac.acbw = acbw
acbw.ac.mu.Unlock()
ccb.subConns[acbw] = struct{}{}
return acbw, nil
}
func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
// The RemoveSubConn() is handled in the run() goroutine, to avoid deadlock
// during switchBalancer() if the old balancer calls RemoveSubConn() in its
// Close().
ccb.updateCh.Put(sc)
// Before we switched the ccBalancerWrapper to use gracefulswitch.Balancer, it
// was required to handle the RemoveSubConn() method asynchronously by pushing
// the update onto the update channel. This was done to avoid a deadlock as
// switchBalancer() was holding cc.mu when calling Close() on the old
// balancer, which would in turn call RemoveSubConn().
//
// With the use of gracefulswitch.Balancer in ccBalancerWrapper, handling this
// asynchronously is probably not required anymore since the switchTo() method
// handles the balancer switch by pushing the update onto the channel.
// TODO(easwars): Handle this inline.
acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
}
ccb.updateCh.Put(&subConnUpdate{acbw: acbw})
}
func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
@ -229,11 +339,6 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol
}
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
ccb.mu.Lock()
defer ccb.mu.Unlock()
if ccb.subConns == nil {
return
}
// Update picker before updating state. Even though the ordering here does
// not matter, it can lead to multiple calls of Pick in the common start-up
// case where we wait for ready and then perform an RPC. If the picker is

View File

@ -278,7 +278,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
credsClone = creds.Clone()
}
cc.balancerBuildOpts = balancer.BuildOptions{
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
@ -286,7 +286,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
}
})
// Build the resolver.
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
@ -465,12 +465,12 @@ type ClientConn struct {
cancel context.CancelFunc // Cancelled on close.
// The following are initialized at dial time, and are read-only after that.
target string // User's dial target.
parsedTarget resolver.Target // See parseTargetAndFindResolver().
authority string // See determineAuthority().
dopts dialOptions // Default and user specified dial options.
balancerBuildOpts balancer.BuildOptions // TODO: delete once we move to the gracefulswitch balancer.
channelzID *channelz.Identifier // Channelz identifier for the channel.
target string // User's dial target.
parsedTarget resolver.Target // See parseTargetAndFindResolver().
authority string // See determineAuthority().
dopts dialOptions // Default and user specified dial options.
channelzID *channelz.Identifier // Channelz identifier for the channel.
balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath.
// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
@ -491,8 +491,6 @@ type ClientConn struct {
sc *ServiceConfig // Latest service config received from the resolver.
conns map[*addrConn]struct{} // Set to nil on close.
mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway.
curBalancerName string // TODO: delete as part of https://github.com/grpc/grpc-go/issues/5229.
balancerWrapper *ccBalancerWrapper // TODO: Use gracefulswitch balancer to be able to initialize this once and never rewrite.
lceMu sync.Mutex // protects lastConnectionError
lastConnectionError error
@ -537,14 +535,7 @@ func (cc *ClientConn) GetState() connectivity.State {
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
func (cc *ClientConn) Connect() {
cc.mu.Lock()
defer cc.mu.Unlock()
if cc.balancerWrapper.exitIdle() {
return
}
for ac := range cc.conns {
go ac.connect()
}
cc.balancerWrapper.exitIdle()
}
func (cc *ClientConn) scWatcher() {
@ -666,21 +657,9 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
if cc.sc != nil && cc.sc.lbConfig != nil {
balCfg = cc.sc.lbConfig.cfg
}
cbn := cc.curBalancerName
bw := cc.balancerWrapper
cc.mu.Unlock()
if cbn != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
var addrs []resolver.Address
for _, addr := range s.Addresses {
if addr.Type == resolver.GRPCLB {
continue
}
addrs = append(addrs, addr)
}
s.Addresses = addrs
}
uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
if ret == nil {
ret = uccsErr // prefer ErrBadResolver state since any other error is
@ -709,50 +688,8 @@ func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) {
cc.csMgr.updateState(connectivity.TransientFailure)
}
// switchBalancer starts the switching from current balancer to the balancer
// with the given name.
//
// It will NOT send the current address list to the new balancer. If needed,
// caller of this function should send address list to the new balancer after
// this function returns.
//
// Caller must hold cc.mu.
func (cc *ClientConn) switchBalancer(name string) {
if strings.EqualFold(cc.curBalancerName, name) {
return
}
channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
// Don't hold cc.mu while closing the balancers. The balancers may call
// methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
// would cause a deadlock in that case.
cc.mu.Unlock()
cc.balancerWrapper.close()
cc.mu.Lock()
builder := balancer.Get(name)
if builder == nil {
channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
builder = newPickfirstBuilder()
} else {
channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)
}
cc.curBalancerName = builder.Name()
cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
}
func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
return
}
// TODO(bar switching) send updates to all balancer wrappers when balancer
// gracefully switching is supported.
cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
cc.mu.Unlock()
cc.balancerWrapper.updateSubConnState(sc, s, err)
}
// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
@ -1002,8 +939,6 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
cc.retryThrottler.Store((*retryThrottler)(nil))
}
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var newBalancerName string
if cc.sc != nil && cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
@ -1023,7 +958,7 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
newBalancerName = PickFirstBalancerName
}
}
cc.switchBalancer(newBalancerName)
cc.balancerWrapper.switchTo(newBalancerName)
}
func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
@ -1074,11 +1009,11 @@ func (cc *ClientConn) Close() error {
rWrapper := cc.resolverWrapper
cc.resolverWrapper = nil
bWrapper := cc.balancerWrapper
cc.balancerWrapper = nil
cc.mu.Unlock()
// The order of closing matters here since the balancer wrapper assumes the
// picker is closed before it is closed.
cc.blockingpicker.close()
if bWrapper != nil {
bWrapper.close()
}

View File

@ -845,9 +845,13 @@ func (s) TestBackoffCancel(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
<-dialStrCh
cc.Close()
// Should not leak. May need -count 5000 to exercise.
defer cc.Close()
select {
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout when waiting for custom dialer to be invoked during Dial")
case <-dialStrCh:
}
}
// UpdateAddresses should cause the next reconnect to begin from the top of the

View File

@ -178,6 +178,9 @@ func (gsb *Balancer) ResolverError(err error) {
}
// ExitIdle forwards the call to the latest balancer created.
//
// If the latest balancer does not support ExitIdle, the subConns are
// re-connected to manually.
func (gsb *Balancer) ExitIdle() {
balToUpdate := gsb.latestBalancer()
if balToUpdate == nil {
@ -188,6 +191,10 @@ func (gsb *Balancer) ExitIdle() {
// called.
if ei, ok := balToUpdate.Balancer.(balancer.ExitIdler); ok {
ei.ExitIdle()
return
}
for sc := range balToUpdate.subconns {
sc.Connect()
}
}

View File

@ -19,7 +19,12 @@
// Package stub implements a balancer for testing purposes.
package stub
import "google.golang.org/grpc/balancer"
import (
"encoding/json"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/serviceconfig"
)
// BalancerFuncs contains all balancer.Balancer functions with a preceding
// *BalancerData parameter for passing additional instance information. Any
@ -28,6 +33,8 @@ type BalancerFuncs struct {
// Init is called after ClientConn and BuildOptions are set in
// BalancerData. It may be used to initialize BalancerData.Data.
Init func(*BalancerData)
// ParseConfig is used for parsing LB configs, if specified.
ParseConfig func(json.RawMessage) (serviceconfig.LoadBalancingConfig, error)
UpdateClientConnState func(*BalancerData, balancer.ClientConnState) error
ResolverError func(*BalancerData, error)
@ -97,6 +104,13 @@ func (bb bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
func (bb bb) Name() string { return bb.name }
func (bb bb) ParseConfig(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
if bb.bf.ParseConfig != nil {
return bb.bf.ParseConfig(lbCfg)
}
return nil, nil
}
// Register registers a stub balancer builder which will call the provided
// functions. The name used should be unique.
func Register(name string, bf BalancerFuncs) {

View File

@ -21,9 +21,7 @@ package test
import (
"context"
"fmt"
"strings"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
@ -43,21 +41,30 @@ const (
loadBalancedServicePort = 443
wantGRPCLBTraceDesc = `Channel switches to new LB policy "grpclb"`
wantRoundRobinTraceDesc = `Channel switches to new LB policy "round_robin"`
wantPickFirstTraceDesc = `Channel switches to new LB policy "pick_first"`
// This is the number of stub backends set up at the start of each test. The
// first backend is used for the "grpclb" policy and the rest are used for
// other LB policies to test balancer switching.
backendCount = 3
)
// setupBackendsAndFakeGRPCLB sets up the stub server backends and a fake grpclb
// server for tests which exercise balancer switch scenarios involving grpclb.
// setupBackendsAndFakeGRPCLB sets up backendCount number of stub server
// backends and a fake grpclb server for tests which exercise balancer switch
// scenarios involving grpclb.
//
// The fake grpclb server always returns the first of the configured stub
// backends as backend addresses. So, the tests are free to use the other
// backends with other LB policies to verify balancer switching scenarios.
//
// Returns a cleanup function to be invoked by the caller.
func setupBackendsAndFakeGRPCLB(t *testing.T) ([]*stubserver.StubServer, *fakegrpclb.Server, func()) {
czCleanup := channelz.NewChannelzStorageForTesting()
backends, backendsCleanup := startBackendsForBalancerSwitch(t)
rawAddrs := stubBackendsToRawAddrs(backends)
lbServer, err := fakegrpclb.NewServer(fakegrpclb.ServerParams{
LoadBalancedServiceName: loadBalancedServiceName,
LoadBalancedServicePort: loadBalancedServicePort,
BackendAddresses: rawAddrs,
BackendAddresses: []string{backends[0].Address},
})
if err != nil {
t.Fatalf("failed to create fake grpclb server: %v", err)
@ -81,7 +88,6 @@ func setupBackendsAndFakeGRPCLB(t *testing.T) ([]*stubserver.StubServer, *fakegr
func startBackendsForBalancerSwitch(t *testing.T) ([]*stubserver.StubServer, func()) {
t.Helper()
const backendCount = 3
backends := make([]*stubserver.StubServer, backendCount)
for i := 0; i < backendCount; i++ {
backend := &stubserver.StubServer{
@ -100,40 +106,6 @@ func startBackendsForBalancerSwitch(t *testing.T) ([]*stubserver.StubServer, fun
}
}
// stubBackendsToRawAddrs converts from a set of stub server backends to raw
// address strings. Useful when pushing addresses to the fake grpclb server.
func stubBackendsToRawAddrs(backends []*stubserver.StubServer) []string {
addrs := make([]string, len(backends))
for i, backend := range backends {
addrs[i] = backend.Address
}
return addrs
}
// checkForTraceEvent looks for a trace event in the top level channel matching
// the given description. Events before since are ignored. Returns nil error if
// such an event is found.
func checkForTraceEvent(ctx context.Context, wantDesc string, since time.Time) error {
for {
if err := ctx.Err(); err != nil {
return err
}
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return fmt.Errorf("channelz returned %d top channels, want 1", len(tcs))
}
for _, event := range tcs[0].Trace.Events {
if event.Timestamp.Before(since) {
continue
}
if strings.Contains(event.Desc, wantDesc) {
return nil
}
}
time.Sleep(defaultTestShortTimeout)
}
}
// TestBalancerSwitch_Basic tests the basic scenario of switching from one LB
// policy to another, as specified in the service config.
func (s) TestBalancerSwitch_Basic(t *testing.T) {
@ -193,42 +165,31 @@ func (s) TestBalancerSwitch_grpclbToPickFirst(t *testing.T) {
// Push a resolver update with no service config and a single address pointing
// to the grpclb server we created above. This will cause the channel to
// switch to the "grpclb" balancer, and will equally distribute RPCs across
// the backends as the fake grpclb server does not support load reporting from
// the clients.
now := time.Now()
// switch to the "grpclb" balancer, which returns a single backend address.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
if err := checkRoundRobin(ctx, cc, addrs[0:1]); err != nil {
t.Fatal(err)
}
if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err)
}
// Push a resolver update containing a non-existent grpclb server address.
// This should not lead to a balancer switch.
now = time.Now()
const nonExistentServer = "non-existent-grpclb-server-address"
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: nonExistentServer, Type: resolver.GRPCLB}}})
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
wantDesc := fmt.Sprintf("Channel switches to new LB policy %q", nonExistentServer)
if err := checkForTraceEvent(sCtx, wantDesc, now); err == nil {
t.Fatal("channel switched balancers when expected not to")
if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil {
t.Fatal(err)
}
// Push a resolver update containing no grpclb server address. This should
// lead to the channel using the default LB policy which is pick_first.
now = time.Now()
r.UpdateState(resolver.State{Addresses: addrs})
if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil {
// lead to the channel using the default LB policy which is pick_first. The
// list of addresses pushed as part of this update is different from the one
// returned by the "grpclb" balancer. So, we should see RPCs going to the
// newly configured backends, as part of the balancer switch.
r.UpdateState(resolver.State{Addresses: addrs[1:]})
if err := checkPickFirst(ctx, cc, addrs[1].Addr); err != nil {
t.Fatal(err)
}
if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err)
}
}
// TestBalancerSwitch_pickFirstToGRPCLB tests the scenario where the channel
@ -248,50 +209,31 @@ func (s) TestBalancerSwitch_pickFirstToGRPCLB(t *testing.T) {
// Push a resolver update containing no grpclb server address. This should
// lead to the channel using the default LB policy which is pick_first.
now := time.Now()
r.UpdateState(resolver.State{Addresses: addrs})
r.UpdateState(resolver.State{Addresses: addrs[1:]})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err)
}
if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil {
if err := checkPickFirst(ctx, cc, addrs[1].Addr); err != nil {
t.Fatal(err)
}
// Push a resolver update with no service config and a single address pointing
// to the grpclb server we created above. This will cause the channel to
// switch to the "grpclb" balancer, and will equally distribute RPCs across
// the backends as the fake grpclb server does not support load reporting from
// the clients.
now = time.Now()
// switch to the "grpclb" balancer, which returns a single backend address.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}})
if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err)
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil {
t.Fatal(err)
}
// Push a resolver update containing a non-existent grpclb server address.
// This should not lead to a balancer switch.
now = time.Now()
const nonExistentServer = "non-existent-grpclb-server-address"
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: nonExistentServer, Type: resolver.GRPCLB}}})
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
wantDesc := fmt.Sprintf("Channel switches to new LB policy %q", nonExistentServer)
if err := checkForTraceEvent(sCtx, wantDesc, now); err == nil {
t.Fatal("channel switched balancers when expected not to")
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "nonExistentServer", Type: resolver.GRPCLB}}})
if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil {
t.Fatal(err)
}
// Switch to "pick_first" again by sending no grpclb server addresses.
now = time.Now()
r.UpdateState(resolver.State{Addresses: addrs})
if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err)
}
if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil {
r.UpdateState(resolver.State{Addresses: addrs[1:]})
if err := checkPickFirst(ctx, cc, addrs[1].Addr); err != nil {
t.Fatal(err)
}
}
@ -327,47 +269,27 @@ func (s) TestBalancerSwitch_RoundRobinToGRPCLB(t *testing.T) {
scpr := parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`)
// Push a resolver update with the service config specifying "round_robin".
now := time.Now()
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: scpr,
})
r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: scpr})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err)
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil {
t.Fatal(err)
}
// Push a resolver update with no service config and a single address pointing
// to the grpclb server we created above. This will cause the channel to
// switch to the "grpclb" balancer, and will equally distribute RPCs across
// the backends as the fake grpclb server does not support load reporting from
// the clients.
now = time.Now()
// switch to the "grpclb" balancer, which returns a single backend address.
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}},
ServiceConfig: scpr,
})
if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err)
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil {
t.Fatal(err)
}
// Switch back to "round_robin".
now = time.Now()
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: scpr,
})
if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err)
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: scpr})
if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil {
t.Fatal(err)
}
}
@ -401,16 +323,12 @@ func (s) TestBalancerSwitch_grpclbNotRegistered(t *testing.T) {
// list fo pick_first.
grpclbAddr := []resolver.Address{{Addr: "non-existent-grpclb-server-address", Type: resolver.GRPCLB}}
addrs = append(grpclbAddr, addrs...)
now := time.Now()
r.UpdateState(resolver.State{Addresses: addrs})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkPickFirst(ctx, cc, addrs[1].Addr); err != nil {
t.Fatal(err)
}
if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err)
}
// Push a resolver update with the same addresses, but with a service config
// specifying "round_robin". The ClientConn is expected to filter out the
@ -443,29 +361,21 @@ func (s) TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy(t *testing
// Push a resolver update containing no grpclb server address. This should
// lead to the channel using the default LB policy which is pick_first.
now := time.Now()
r.UpdateState(resolver.State{Addresses: addrs})
r.UpdateState(resolver.State{Addresses: addrs[1:]})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err)
}
if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil {
if err := checkPickFirst(ctx, cc, addrs[1].Addr); err != nil {
t.Fatal(err)
}
// Push a resolver update with no service config. The addresses list contains
// the stub backend addresses and a single address pointing to the grpclb
// server we created above. This will cause the channel to switch to the
// "grpclb" balancer, and will equally distribute RPCs across the backends.
now = time.Now()
// "grpclb" balancer, which returns a single backend address.
r.UpdateState(resolver.State{
Addresses: append(addrs, resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}),
Addresses: append(addrs[1:], resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}),
})
if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err)
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil {
t.Fatal(err)
}
@ -474,29 +384,19 @@ func (s) TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy(t *testing
// contains an address of type "grpclb". This should be preferred and hence
// there should be no balancer switch.
scpr := parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`)
now = time.Now()
r.UpdateState(resolver.State{
Addresses: append(addrs, resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}),
Addresses: append(addrs[1:], resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}),
ServiceConfig: scpr,
})
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if err := checkForTraceEvent(sCtx, wantRoundRobinTraceDesc, now); err == nil {
t.Fatal("channel switched balancers when expected not to")
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil {
t.Fatal(err)
}
// Switch to "round_robin" by removing the address of type "grpclb".
now = time.Now()
r.UpdateState(resolver.State{Addresses: addrs})
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
r.UpdateState(resolver.State{Addresses: addrs[1:]})
if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil {
t.Fatal(err)
}
if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err)
}
}
// TestBalancerSwitch_LoadBalancingConfigTrumps verifies that the
@ -518,31 +418,21 @@ func (s) TestBalancerSwitch_LoadBalancingConfigTrumps(t *testing.T) {
// Push a resolver update with no service config and a single address pointing
// to the grpclb server we created above. This will cause the channel to
// switch to the "grpclb" balancer, and will equally distribute RPCs across
// the backends as the fake grpclb server does not support load reporting from
// the clients.
now := time.Now()
// switch to the "grpclb" balancer, which returns a single backend address.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil {
t.Fatal(err)
}
if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err)
}
// Push a resolver update with the service config specifying "round_robin"
// through the recommended `loadBalancingConfig` field.
now = time.Now()
r.UpdateState(resolver.State{
Addresses: addrs,
Addresses: addrs[1:],
ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
})
if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err)
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil {
t.Fatal(err)
}
@ -553,14 +443,8 @@ func (s) TestBalancerSwitch_LoadBalancingConfigTrumps(t *testing.T) {
// switched. And because the `loadBalancingConfig` field trumps everything
// else, the address of type "grpclb" should be ignored.
grpclbAddr := resolver.Address{Addr: "non-existent-grpclb-server-address", Type: resolver.GRPCLB}
now = time.Now()
r.UpdateState(resolver.State{Addresses: append(addrs, grpclbAddr)})
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if err := checkForTraceEvent(sCtx, wantRoundRobinTraceDesc, now); err == nil {
t.Fatal("channel switched balancers when expected not to")
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
r.UpdateState(resolver.State{Addresses: append(addrs[1:], grpclbAddr)})
if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil {
t.Fatal(err)
}
}
@ -634,3 +518,88 @@ func (s) TestBalancerSwitch_OldBalancerCallsRemoveSubConnInClose(t *testing.T) {
case <-done:
}
}
// TestBalancerSwitch_Graceful tests the graceful switching of LB policies. It
// starts off by configuring "round_robin" on the channel and ensures that RPCs
// are successful. Then, it switches to a stub balancer which does not report a
// picker until instructed by the test do to so. At this point, the test
// verifies that RPCs are still successful using the old balancer. Then the test
// asks the new balancer to report a healthy picker and the test verifies that
// the RPCs get routed using the picker reported by the new balancer.
func (s) TestBalancerSwitch_Graceful(t *testing.T) {
backends, cleanup := startBackendsForBalancerSwitch(t)
defer cleanup()
addrs := stubBackendsToResolverAddrs(backends)
r := manual.NewBuilderWithScheme("whatever")
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
// Push a resolver update with the service config specifying "round_robin".
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
r.UpdateState(resolver.State{
Addresses: addrs[1:],
ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
})
if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil {
t.Fatal(err)
}
// Register a stub balancer which uses a "pick_first" balancer underneath and
// signals on a channel when it receives ClientConn updates. But it does not
// forward the ccUpdate to the underlying "pick_first" balancer until the test
// asks it to do so. This allows us to test the graceful switch functionality.
// Until the test asks the stub balancer to forward the ccUpdate, RPCs should
// get routed to the old balancer. And once the test gives the go ahead, RPCs
// should get routed to the new balancer.
ccUpdateCh := make(chan struct{})
waitToProceed := make(chan struct{})
stub.Register(t.Name(), stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
pf := balancer.Get(grpc.PickFirstBalancerName)
bd.Data = pf.Build(bd.ClientConn, bd.BuildOptions)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
bal := bd.Data.(balancer.Balancer)
close(ccUpdateCh)
go func() {
<-waitToProceed
bal.UpdateClientConnState(ccs)
}()
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
bal := bd.Data.(balancer.Balancer)
bal.UpdateSubConnState(sc, state)
},
})
// Push a resolver update with the service config specifying our stub
// balancer. We should see a trace event for this balancer switch. But RPCs
// should still be routed to the old balancer since our stub balancer does not
// report a ready picker until we ask it to do so.
r.UpdateState(resolver.State{
Addresses: addrs[:1],
ServiceConfig: r.CC.ParseServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%v": {}}]}`, t.Name())),
})
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for a ClientConnState update on the new balancer")
case <-ccUpdateCh:
}
if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil {
t.Fatal(err)
}
// Ask our stub balancer to forward the earlier received ccUpdate to the
// underlying "pick_first" balancer which will result in a healthy picker
// being reported to the channel. RPCs should start using the new balancer.
close(waitToProceed)
if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil {
t.Fatal(err)
}
}

View File

@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
@ -127,54 +128,6 @@ func (s) TestResolverUpdate_InvalidServiceConfigAsFirstUpdate(t *testing.T) {
}
}
// The wrappingBalancer wraps a pick_first balancer and writes to a channel when
// it receives a ClientConn update. This is different to a stub balancer which
// only notifies of updates from grpc, but does not contain a real balancer.
//
// The wrappingBalancer allows us to write tests with a real backend and make
// real RPCs.
type wrappingBalancerBuilder struct {
name string
updateCh *testutils.Channel
}
func (bb wrappingBalancerBuilder) Name() string { return bb.name }
func (bb wrappingBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
pf := balancer.Get(grpc.PickFirstBalancerName)
b := &wrappingBalancer{
Balancer: pf.Build(cc, opts),
updateCh: bb.updateCh,
}
return b
}
func (bb wrappingBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
cfg := &wrappingBalancerConfig{}
if err := json.Unmarshal(c, cfg); err != nil {
return nil, err
}
return cfg, nil
}
type wrappingBalancer struct {
balancer.Balancer
updateCh *testutils.Channel
}
func (b *wrappingBalancer) UpdateClientConnState(c balancer.ClientConnState) error {
if _, ok := c.BalancerConfig.(*wrappingBalancerConfig); !ok {
return fmt.Errorf("received balancer config of unsupported type %T", c.BalancerConfig)
}
b.updateCh.Send(c)
return b.Balancer.UpdateClientConnState(c)
}
type wrappingBalancerConfig struct {
serviceconfig.LoadBalancingConfig
Config string `json:"config,omitempty"`
}
func verifyClientConnStateUpdate(got, want balancer.ClientConnState) error {
if got, want := got.ResolverState.Addresses, want.ResolverState.Addresses; !cmp.Equal(got, want) {
return fmt.Errorf("update got unexpected addresses: %v, want %v", got, want)
@ -193,11 +146,38 @@ func verifyClientConnStateUpdate(got, want balancer.ClientConnState) error {
// having sent a good update. This should result in the ClientConn discarding
// the new invalid service config, and continuing to use the old good config.
func (s) TestResolverUpdate_InvalidServiceConfigAfterGoodUpdate(t *testing.T) {
// Register a wrapper balancer to get notified of ClientConn updates.
ccsCh := testutils.NewChannel()
balancer.Register(wrappingBalancerBuilder{
name: t.Name(),
updateCh: ccsCh,
type wrappingBalancerConfig struct {
serviceconfig.LoadBalancingConfig
Config string `json:"config,omitempty"`
}
// Register a stub balancer which uses a "pick_first" balancer underneath and
// signals on a channel when it receives ClientConn updates.
ccUpdateCh := testutils.NewChannel()
stub.Register(t.Name(), stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
pf := balancer.Get(grpc.PickFirstBalancerName)
bd.Data = pf.Build(bd.ClientConn, bd.BuildOptions)
},
ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
cfg := &wrappingBalancerConfig{}
if err := json.Unmarshal(lbCfg, cfg); err != nil {
return nil, err
}
return cfg, nil
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
if _, ok := ccs.BalancerConfig.(*wrappingBalancerConfig); !ok {
return fmt.Errorf("received balancer config of unsupported type %T", ccs.BalancerConfig)
}
bal := bd.Data.(balancer.Balancer)
ccUpdateCh.Send(ccs)
return bal.UpdateClientConnState(ccs)
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
bal := bd.Data.(balancer.Balancer)
bal.UpdateSubConnState(sc, state)
},
})
// Start a backend exposing the test service.
@ -242,7 +222,7 @@ func (s) TestResolverUpdate_InvalidServiceConfigAfterGoodUpdate(t *testing.T) {
},
BalancerConfig: &wrappingBalancerConfig{Config: lbCfg},
}
ccs, err := ccsCh.Receive(ctx)
ccs, err := ccUpdateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout when waiting for ClientConnState update from grpc")
}
@ -263,7 +243,7 @@ func (s) TestResolverUpdate_InvalidServiceConfigAfterGoodUpdate(t *testing.T) {
badSC := r.CC.ParseServiceConfig("bad json service config")
wantCCS.ResolverState.ServiceConfig = badSC
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: badSC})
ccs, err = ccsCh.Receive(ctx)
ccs, err = ccUpdateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout when waiting for ClientConnState update from grpc")
}

View File

@ -45,7 +45,6 @@ const rrServiceConfig = `{"loadBalancingConfig": [{"round_robin":{}}]}`
func checkRoundRobin(ctx context.Context, cc *grpc.ClientConn, addrs []resolver.Address) error {
client := testgrpc.NewTestServiceClient(cc)
var peer peer.Peer
// Make sure connections to all backends are up.
backendCount := len(addrs)
for i := 0; i < backendCount; i++ {
@ -54,6 +53,7 @@ func checkRoundRobin(ctx context.Context, cc *grpc.ClientConn, addrs []resolver.
if ctx.Err() != nil {
return fmt.Errorf("timeout waiting for connection to %q to be up", addrs[i].Addr)
}
var peer peer.Peer
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
// Some tests remove backends and check if round robin is happening
// across the remaining backends. In such cases, RPCs can initially fail
@ -69,10 +69,11 @@ func checkRoundRobin(ctx context.Context, cc *grpc.ClientConn, addrs []resolver.
}
// Make sure RPCs are sent to all backends.
for i := 0; i < 3*backendCount; i++ {
var peer peer.Peer
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
return fmt.Errorf("EmptyCall() = %v, want <nil>", err)
}
if gotPeer, wantPeer := addrs[i%backendCount].Addr, peer.Addr.String(); gotPeer != wantPeer {
if gotPeer, wantPeer := peer.Addr.String(), addrs[i%backendCount].Addr; gotPeer != wantPeer {
return fmt.Errorf("rpc sent to peer %q, want peer %q", gotPeer, wantPeer)
}
}