From 3a74cd52c862325614b7456e1cb22be1ae4d4b8c Mon Sep 17 00:00:00 2001 From: Zach Reyes <39203661+zasweq@users.noreply.github.com> Date: Tue, 22 Mar 2022 18:58:13 -0400 Subject: [PATCH] balancergroup: Add trigger point to gracefully switch a child (#5251) * balancergroup: Add trigger point to gracefully switch a child --- internal/balancergroup/balancergroup.go | 31 ++++ internal/balancergroup/balancergroup_test.go | 142 +++++++++++++++++++ 2 files changed, 173 insertions(+) diff --git a/internal/balancergroup/balancergroup.go b/internal/balancergroup/balancergroup.go index 4f089fdf..3daad144 100644 --- a/internal/balancergroup/balancergroup.go +++ b/internal/balancergroup/balancergroup.go @@ -160,6 +160,20 @@ func (sbc *subBalancerWrapper) resolverError(err error) { b.ResolverError(err) } +func (sbc *subBalancerWrapper) gracefulSwitch(builder balancer.Builder) { + sbc.builder = builder + b := sbc.balancer + // Even if you get an add and it persists builder but doesn't start + // balancer, this would leave graceful switch being nil, in which we are + // correctly overwriting with the recent builder here as well to use later. + // The graceful switch balancer's presence is an invariant of whether the + // balancer group is closed or not (if closed, nil, if started, present). + if sbc.balancer != nil { + sbc.group.logger.Infof("Switching child policy %v to type %v", sbc.id, sbc.builder.Name()) + b.SwitchTo(sbc.builder) + } +} + func (sbc *subBalancerWrapper) stopBalancer() { sbc.balancer.Close() sbc.balancer = nil @@ -332,6 +346,23 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) { bg.outgoingMu.Unlock() } +// UpdateBuilder updates the builder for a current child, starting the Graceful +// Switch process for that child. +func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) { + bg.outgoingMu.Lock() + // This does not deal with the balancer cache because this call should come + // after an Add call for a given child balancer. If the child is removed, + // the caller will call Add if the child balancer comes back which would + // then deal with the balancer cache. + sbc := bg.idToBalancerConfig[id] + if sbc == nil { + // simply ignore it if not present, don't error + return + } + sbc.gracefulSwitch(builder) + bg.outgoingMu.Unlock() +} + // Remove removes the balancer with id from the group. // // But doesn't close the balancer. The balancer is kept in a cache, and will be diff --git a/internal/balancergroup/balancergroup_test.go b/internal/balancergroup/balancergroup_test.go index d8a5a1c1..d962faa0 100644 --- a/internal/balancergroup/balancergroup_test.go +++ b/internal/balancergroup/balancergroup_test.go @@ -17,11 +17,13 @@ package balancergroup import ( + "context" "fmt" "testing" "time" "github.com/google/go-cmp/cmp" + "google.golang.org/grpc" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/balancer/weightedtarget/weightedaggregator" @@ -34,6 +36,11 @@ import ( "google.golang.org/grpc/resolver" ) +const ( + defaultTestTimeout = 5 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond +) + var ( rrBuilder = balancer.Get(roundrobin.Name) testBalancerIDs = []string{"b1", "b2", "b3"} @@ -534,3 +541,138 @@ func (s) TestBalancerExitIdleOne(t *testing.T) { case <-exitIdleCh: } } + +// TestBalancerGracefulSwitch tests the graceful switch functionality for a +// child of the balancer group. At first, the child is configured as a round +// robin load balancer, and thus should behave accordingly. The test then +// gracefully switches this child to a custom type which only creates a SubConn +// for the second passed in address and also only picks that created SubConn. +// The new aggregated picker should reflect this change for the child. +func (s) TestBalancerGracefulSwitch(t *testing.T) { + cc := testutils.NewTestClientConn(t) + gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR) + gator.Start() + bg := New(cc, balancer.BuildOptions{}, gator, nil) + gator.Add(testBalancerIDs[0], 1) + bg.Add(testBalancerIDs[0], rrBuilder) + bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}}) + + bg.Start() + + m1 := make(map[resolver.Address]balancer.SubConn) + scs := make(map[balancer.SubConn]bool) + for i := 0; i < 2; i++ { + addrs := <-cc.NewSubConnAddrsCh + sc := <-cc.NewSubConnCh + m1[addrs[0]] = sc + scs[sc] = true + bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + } + + p1 := <-cc.NewPickerCh + want := []balancer.SubConn{ + m1[testBackendAddrs[0]], m1[testBackendAddrs[1]], + } + if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil { + t.Fatalf("want %v, got %v", want, err) + } + + // The balancer type for testBalancersIDs[0] is currently Round Robin. Now, + // change it to a balancer that has separate behavior logically (creating + // SubConn for second address in address list and always picking that + // SubConn), and see if the downstream behavior reflects that change. + bg.UpdateBuilder(testBalancerIDs[0], wrappedPickFirstBalancerBuilder{}) + if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}); err != nil { + t.Fatalf("error updating ClientConn state: %v", err) + } + + addrs := <-cc.NewSubConnAddrsCh + if addrs[0].Addr != testBackendAddrs[3].Addr { + // Verifies forwarded to new created balancer, as the wrapped pick first + // balancer will delete first address. + t.Fatalf("newSubConn called with wrong address, want: %v, got : %v", testBackendAddrs[3].Addr, addrs[0].Addr) + } + sc := <-cc.NewSubConnCh + + // Update the pick first balancers SubConn as CONNECTING. This will cause + // the pick first balancer to UpdateState() with CONNECTING, which shouldn't send + // a Picker update back, as the Graceful Switch process is not complete. + bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + select { + case <-cc.NewPickerCh: + t.Fatalf("No new picker should have been sent due to the Graceful Switch process not completing") + case <-ctx.Done(): + } + + // Update the pick first balancers SubConn as READY. This will cause + // the pick first balancer to UpdateState() with READY, which should send a + // Picker update back, as the Graceful Switch process is complete. This + // Picker should always pick the pick first's created SubConn which + // corresponds to address 3. + bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + p2 := <-cc.NewPickerCh + pr, err := p2.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("error picking: %v", err) + } + if pr.SubConn != sc { + t.Fatalf("picker.Pick(), want %v, got %v", sc, pr.SubConn) + } + + // The Graceful Switch process completing for the child should cause the + // SubConns for the balancer being gracefully switched from to get deleted. + ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + for i := 0; i < 2; i++ { + select { + case <-ctx.Done(): + t.Fatalf("error waiting for RemoveSubConn()") + case sc := <-cc.RemoveSubConnCh: + // The SubConn removed should have been one of the two created + // SubConns, and both should be deleted. + if ok := scs[sc]; ok { + delete(scs, sc) + continue + } else { + t.Fatalf("RemoveSubConn called for wrong SubConn %v, want in %v", sc, scs) + } + } + } +} + +type wrappedPickFirstBalancerBuilder struct{} + +func (wrappedPickFirstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + builder := balancer.Get(grpc.PickFirstBalancerName) + wpfb := &wrappedPickFirstBalancer{ + ClientConn: cc, + } + pf := builder.Build(wpfb, opts) + wpfb.Balancer = pf + return wpfb +} + +func (wrappedPickFirstBalancerBuilder) Name() string { + return "wrappedPickFirstBalancer" +} + +type wrappedPickFirstBalancer struct { + balancer.Balancer + balancer.ClientConn +} + +func (wb *wrappedPickFirstBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + s.ResolverState.Addresses = s.ResolverState.Addresses[1:] + return wb.Balancer.UpdateClientConnState(s) +} + +func (wb *wrappedPickFirstBalancer) UpdateState(state balancer.State) { + // Eat it if IDLE - allows it to switch over only on a READY SubConn. + if state.ConnectivityState == connectivity.Idle { + return + } + wb.ClientConn.UpdateState(state) +}