balancergroup: Add trigger point to gracefully switch a child (#5251)
* balancergroup: Add trigger point to gracefully switch a child
This commit is contained in:
@ -160,6 +160,20 @@ func (sbc *subBalancerWrapper) resolverError(err error) {
|
|||||||
b.ResolverError(err)
|
b.ResolverError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sbc *subBalancerWrapper) gracefulSwitch(builder balancer.Builder) {
|
||||||
|
sbc.builder = builder
|
||||||
|
b := sbc.balancer
|
||||||
|
// Even if you get an add and it persists builder but doesn't start
|
||||||
|
// balancer, this would leave graceful switch being nil, in which we are
|
||||||
|
// correctly overwriting with the recent builder here as well to use later.
|
||||||
|
// The graceful switch balancer's presence is an invariant of whether the
|
||||||
|
// balancer group is closed or not (if closed, nil, if started, present).
|
||||||
|
if sbc.balancer != nil {
|
||||||
|
sbc.group.logger.Infof("Switching child policy %v to type %v", sbc.id, sbc.builder.Name())
|
||||||
|
b.SwitchTo(sbc.builder)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (sbc *subBalancerWrapper) stopBalancer() {
|
func (sbc *subBalancerWrapper) stopBalancer() {
|
||||||
sbc.balancer.Close()
|
sbc.balancer.Close()
|
||||||
sbc.balancer = nil
|
sbc.balancer = nil
|
||||||
@ -332,6 +346,23 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
|
|||||||
bg.outgoingMu.Unlock()
|
bg.outgoingMu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateBuilder updates the builder for a current child, starting the Graceful
|
||||||
|
// Switch process for that child.
|
||||||
|
func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) {
|
||||||
|
bg.outgoingMu.Lock()
|
||||||
|
// This does not deal with the balancer cache because this call should come
|
||||||
|
// after an Add call for a given child balancer. If the child is removed,
|
||||||
|
// the caller will call Add if the child balancer comes back which would
|
||||||
|
// then deal with the balancer cache.
|
||||||
|
sbc := bg.idToBalancerConfig[id]
|
||||||
|
if sbc == nil {
|
||||||
|
// simply ignore it if not present, don't error
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sbc.gracefulSwitch(builder)
|
||||||
|
bg.outgoingMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// Remove removes the balancer with id from the group.
|
// Remove removes the balancer with id from the group.
|
||||||
//
|
//
|
||||||
// But doesn't close the balancer. The balancer is kept in a cache, and will be
|
// But doesn't close the balancer. The balancer is kept in a cache, and will be
|
||||||
|
@ -17,11 +17,13 @@
|
|||||||
package balancergroup
|
package balancergroup
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/balancer"
|
"google.golang.org/grpc/balancer"
|
||||||
"google.golang.org/grpc/balancer/roundrobin"
|
"google.golang.org/grpc/balancer/roundrobin"
|
||||||
"google.golang.org/grpc/balancer/weightedtarget/weightedaggregator"
|
"google.golang.org/grpc/balancer/weightedtarget/weightedaggregator"
|
||||||
@ -34,6 +36,11 @@ import (
|
|||||||
"google.golang.org/grpc/resolver"
|
"google.golang.org/grpc/resolver"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultTestTimeout = 5 * time.Second
|
||||||
|
defaultTestShortTimeout = 10 * time.Millisecond
|
||||||
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
rrBuilder = balancer.Get(roundrobin.Name)
|
rrBuilder = balancer.Get(roundrobin.Name)
|
||||||
testBalancerIDs = []string{"b1", "b2", "b3"}
|
testBalancerIDs = []string{"b1", "b2", "b3"}
|
||||||
@ -534,3 +541,138 @@ func (s) TestBalancerExitIdleOne(t *testing.T) {
|
|||||||
case <-exitIdleCh:
|
case <-exitIdleCh:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestBalancerGracefulSwitch tests the graceful switch functionality for a
|
||||||
|
// child of the balancer group. At first, the child is configured as a round
|
||||||
|
// robin load balancer, and thus should behave accordingly. The test then
|
||||||
|
// gracefully switches this child to a custom type which only creates a SubConn
|
||||||
|
// for the second passed in address and also only picks that created SubConn.
|
||||||
|
// The new aggregated picker should reflect this change for the child.
|
||||||
|
func (s) TestBalancerGracefulSwitch(t *testing.T) {
|
||||||
|
cc := testutils.NewTestClientConn(t)
|
||||||
|
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
|
||||||
|
gator.Start()
|
||||||
|
bg := New(cc, balancer.BuildOptions{}, gator, nil)
|
||||||
|
gator.Add(testBalancerIDs[0], 1)
|
||||||
|
bg.Add(testBalancerIDs[0], rrBuilder)
|
||||||
|
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
|
||||||
|
|
||||||
|
bg.Start()
|
||||||
|
|
||||||
|
m1 := make(map[resolver.Address]balancer.SubConn)
|
||||||
|
scs := make(map[balancer.SubConn]bool)
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
addrs := <-cc.NewSubConnAddrsCh
|
||||||
|
sc := <-cc.NewSubConnCh
|
||||||
|
m1[addrs[0]] = sc
|
||||||
|
scs[sc] = true
|
||||||
|
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
|
||||||
|
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
|
||||||
|
}
|
||||||
|
|
||||||
|
p1 := <-cc.NewPickerCh
|
||||||
|
want := []balancer.SubConn{
|
||||||
|
m1[testBackendAddrs[0]], m1[testBackendAddrs[1]],
|
||||||
|
}
|
||||||
|
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
|
||||||
|
t.Fatalf("want %v, got %v", want, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The balancer type for testBalancersIDs[0] is currently Round Robin. Now,
|
||||||
|
// change it to a balancer that has separate behavior logically (creating
|
||||||
|
// SubConn for second address in address list and always picking that
|
||||||
|
// SubConn), and see if the downstream behavior reflects that change.
|
||||||
|
bg.UpdateBuilder(testBalancerIDs[0], wrappedPickFirstBalancerBuilder{})
|
||||||
|
if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}); err != nil {
|
||||||
|
t.Fatalf("error updating ClientConn state: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
addrs := <-cc.NewSubConnAddrsCh
|
||||||
|
if addrs[0].Addr != testBackendAddrs[3].Addr {
|
||||||
|
// Verifies forwarded to new created balancer, as the wrapped pick first
|
||||||
|
// balancer will delete first address.
|
||||||
|
t.Fatalf("newSubConn called with wrong address, want: %v, got : %v", testBackendAddrs[3].Addr, addrs[0].Addr)
|
||||||
|
}
|
||||||
|
sc := <-cc.NewSubConnCh
|
||||||
|
|
||||||
|
// Update the pick first balancers SubConn as CONNECTING. This will cause
|
||||||
|
// the pick first balancer to UpdateState() with CONNECTING, which shouldn't send
|
||||||
|
// a Picker update back, as the Graceful Switch process is not complete.
|
||||||
|
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
|
||||||
|
defer cancel()
|
||||||
|
select {
|
||||||
|
case <-cc.NewPickerCh:
|
||||||
|
t.Fatalf("No new picker should have been sent due to the Graceful Switch process not completing")
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the pick first balancers SubConn as READY. This will cause
|
||||||
|
// the pick first balancer to UpdateState() with READY, which should send a
|
||||||
|
// Picker update back, as the Graceful Switch process is complete. This
|
||||||
|
// Picker should always pick the pick first's created SubConn which
|
||||||
|
// corresponds to address 3.
|
||||||
|
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
|
||||||
|
p2 := <-cc.NewPickerCh
|
||||||
|
pr, err := p2.Pick(balancer.PickInfo{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error picking: %v", err)
|
||||||
|
}
|
||||||
|
if pr.SubConn != sc {
|
||||||
|
t.Fatalf("picker.Pick(), want %v, got %v", sc, pr.SubConn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The Graceful Switch process completing for the child should cause the
|
||||||
|
// SubConns for the balancer being gracefully switched from to get deleted.
|
||||||
|
ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||||
|
defer cancel()
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.Fatalf("error waiting for RemoveSubConn()")
|
||||||
|
case sc := <-cc.RemoveSubConnCh:
|
||||||
|
// The SubConn removed should have been one of the two created
|
||||||
|
// SubConns, and both should be deleted.
|
||||||
|
if ok := scs[sc]; ok {
|
||||||
|
delete(scs, sc)
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
t.Fatalf("RemoveSubConn called for wrong SubConn %v, want in %v", sc, scs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type wrappedPickFirstBalancerBuilder struct{}
|
||||||
|
|
||||||
|
func (wrappedPickFirstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
|
||||||
|
builder := balancer.Get(grpc.PickFirstBalancerName)
|
||||||
|
wpfb := &wrappedPickFirstBalancer{
|
||||||
|
ClientConn: cc,
|
||||||
|
}
|
||||||
|
pf := builder.Build(wpfb, opts)
|
||||||
|
wpfb.Balancer = pf
|
||||||
|
return wpfb
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wrappedPickFirstBalancerBuilder) Name() string {
|
||||||
|
return "wrappedPickFirstBalancer"
|
||||||
|
}
|
||||||
|
|
||||||
|
type wrappedPickFirstBalancer struct {
|
||||||
|
balancer.Balancer
|
||||||
|
balancer.ClientConn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wb *wrappedPickFirstBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
||||||
|
s.ResolverState.Addresses = s.ResolverState.Addresses[1:]
|
||||||
|
return wb.Balancer.UpdateClientConnState(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wb *wrappedPickFirstBalancer) UpdateState(state balancer.State) {
|
||||||
|
// Eat it if IDLE - allows it to switch over only on a READY SubConn.
|
||||||
|
if state.ConnectivityState == connectivity.Idle {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
wb.ClientConn.UpdateState(state)
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user