From 4745f6ae0ddd06cf0892fd56a3ca478af1f70b6a Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 2 Apr 2019 16:27:52 -0700 Subject: [PATCH] grpclb: fallback after init (#2681) regenerate picker when switching between fallback/non-fallback, because new SubConn state might not be updated for cached SubConns --- balancer/grpclb/grpclb.go | 97 +++++++++++++++++------ balancer/grpclb/grpclb_remote_balancer.go | 42 +++++++--- balancer/grpclb/grpclb_test.go | 80 ++++++++++++++++--- balancer/grpclb/grpclb_test_util_test.go | 85 ++++++++++++++++++++ 4 files changed, 253 insertions(+), 51 deletions(-) create mode 100644 balancer/grpclb/grpclb_test_util_test.go diff --git a/balancer/grpclb/grpclb.go b/balancer/grpclb/grpclb.go index e385d02d..a1123ced 100644 --- a/balancer/grpclb/grpclb.go +++ b/balancer/grpclb/grpclb.go @@ -172,7 +172,6 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal doneCh: make(chan struct{}), manualResolver: r, - csEvltr: &balancer.ConnectivityStateEvaluator{}, subConns: make(map[resolver.Address]balancer.SubConn), scStates: make(map[balancer.SubConn]connectivity.State), picker: &errPicker{err: balancer.ErrNoSubConnAvailable}, @@ -238,15 +237,15 @@ type lbBalancer struct { // but with only READY SCs will be gerenated. backendAddrs []resolver.Address // Roundrobin functionalities. - csEvltr *balancer.ConnectivityStateEvaluator state connectivity.State subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn. scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns. picker balancer.Picker // Support fallback to resolved backend addresses if there's no response // from remote balancer within fallbackTimeout. - fallbackTimerExpired bool - serverListReceived bool + remoteBalancerConnected bool + serverListReceived bool + inFallback bool // resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set // when resolved address updates are received, and read in the goroutine // handling fallback. @@ -264,13 +263,16 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) { return } + if lb.state == connectivity.Connecting { + lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable} + return + } + var readySCs []balancer.SubConn if lb.usePickFirst { - if lb.state == connectivity.Ready || lb.state == connectivity.Idle { - for _, sc := range lb.subConns { - readySCs = append(readySCs, sc) - break - } + for _, sc := range lb.subConns { + readySCs = append(readySCs, sc) + break } } else { for _, a := range lb.backendAddrs { @@ -286,10 +288,13 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) { // If there's no ready SubConns, always re-pick. This is to avoid drops // unless at least one SubConn is ready. Otherwise we may drop more // often than want because of drops + re-picks(which become re-drops). + // + // This doesn't seem to be necessary after the connecting check above. + // Kept for safety. lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable} return } - if len(lb.fullServerList) <= 0 { + if lb.inFallback { lb.picker = newRRPicker(readySCs) return } @@ -305,6 +310,34 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) { prevLBPicker.updateReadySCs(readySCs) } +// aggregateSubConnStats calculate the aggregated state of SubConns in +// lb.SubConns. These SubConns are subconns in use (when switching between +// fallback and grpclb). lb.scState contains states for all SubConns, including +// those in cache (SubConns are cached for 10 seconds after remove). +// +// The aggregated state is: +// - If at least one SubConn in Ready, the aggregated state is Ready; +// - Else if at least one SubConn in Connecting, the aggregated state is Connecting; +// - Else the aggregated state is TransientFailure. +func (lb *lbBalancer) aggregateSubConnStates() connectivity.State { + var numConnecting uint64 + + for _, sc := range lb.subConns { + if state, ok := lb.scStates[sc]; ok { + switch state { + case connectivity.Ready: + return connectivity.Ready + case connectivity.Connecting: + numConnecting++ + } + } + } + if numConnecting > 0 { + return connectivity.Connecting + } + return connectivity.TransientFailure +} + func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { if grpclog.V(2) { grpclog.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s) @@ -328,18 +361,33 @@ func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivi // kept the sc's state in scStates. Remove state for this sc here. delete(lb.scStates, sc) } - - oldAggrState := lb.state - lb.state = lb.csEvltr.RecordTransition(oldS, s) - - // Regenerate picker when one of the following happens: + // Force regenerate picker if // - this sc became ready from not-ready // - this sc became not-ready from ready - // - the aggregated state of balancer became TransientFailure from non-TransientFailure - // - the aggregated state of balancer became non-TransientFailure from TransientFailure - if (oldS == connectivity.Ready) != (s == connectivity.Ready) || - (lb.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) { - lb.regeneratePicker(false) + lb.updateStateAndPicker((oldS == connectivity.Ready) != (s == connectivity.Ready), false) + + // Enter fallback when the aggregated state is not Ready and the connection + // to remote balancer is lost. + if lb.state != connectivity.Ready { + if !lb.inFallback && !lb.remoteBalancerConnected { + // Enter fallback. + lb.refreshSubConns(lb.resolvedBackendAddrs, false) + } + } +} + +// updateStateAndPicker re-calculate the aggregated state, and regenerate picker +// if overall state is changed. +// +// If forceRegeneratePicker is true, picker will be regenerated. +func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop bool) { + oldAggrState := lb.state + lb.state = lb.aggregateSubConnStates() + // Regenerate picker when one of the following happens: + // - caller wants to regenerate + // - the aggregated state changed + if forceRegeneratePicker || (lb.state != oldAggrState) { + lb.regeneratePicker(resetDrop) } lb.cc.UpdateBalancerState(lb.state, lb.picker) @@ -357,11 +405,11 @@ func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) { return } lb.mu.Lock() - if lb.serverListReceived { + if lb.inFallback || lb.serverListReceived { lb.mu.Unlock() return } - lb.fallbackTimerExpired = true + // Enter fallback. lb.refreshSubConns(lb.resolvedBackendAddrs, false) lb.mu.Unlock() } @@ -405,10 +453,7 @@ func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { lb.mu.Lock() lb.resolvedBackendAddrs = backendAddrs - // If serverListReceived is true, connection to remote balancer was - // successful and there's no need to do fallback anymore. - // If fallbackTimerExpired is false, fallback hasn't happened yet. - if !lb.serverListReceived && lb.fallbackTimerExpired { + if lb.inFallback { // This means we received a new list of resolved backends, and we are // still in fallback mode. Need to update the list of backends we are // using to the new list of backends. diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index e7e92708..7ed886f0 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -85,24 +85,26 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) { backendAddrs = append(backendAddrs, addr) } - // Call refreshSubConns to create/remove SubConns. + // Call refreshSubConns to create/remove SubConns. If we are in fallback, + // this is also exiting fallback. lb.refreshSubConns(backendAddrs, true) - // Regenerate and update picker no matter if there's update on backends (if - // any SubConn will be newed/removed). Because since the full serverList was - // different, there might be updates in drops or pick weights(different - // number of duplicates). We need to update picker with the fulllist. - // - // Now with cache, even if SubConn was newed/removed, there might be no - // state changes. - lb.regeneratePicker(true) - lb.cc.UpdateBalancerState(lb.state, lb.picker) } -// refreshSubConns creates/removes SubConns with backendAddrs. It returns a bool -// indicating whether the backendAddrs are different from the cached -// backendAddrs (whether any SubConn was newed/removed). +// refreshSubConns creates/removes SubConns with backendAddrs, and refreshes +// balancer state and picker. +// // Caller must hold lb.mu. func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fromGRPCLBServer bool) { + defer func() { + // Regenerate and update picker after refreshing subconns because with + // cache, even if SubConn was newed/removed, there might be no state + // changes (the subconn will be kept in cache, not actually + // newed/removed). + lb.updateStateAndPicker(true, true) + }() + + lb.inFallback = !fromGRPCLBServer + opts := balancer.NewSubConnOptions{} if fromGRPCLBServer { opts.CredsBundle = lb.grpclbBackendCreds @@ -218,6 +220,9 @@ func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) { if err != nil { return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err) } + lb.mu.Lock() + lb.remoteBalancerConnected = true + lb.mu.Unlock() // grpclb handshake on the stream. initReq := &lbpb.LoadBalanceRequest{ @@ -270,6 +275,17 @@ func (lb *lbBalancer) watchRemoteBalancer() { // Trigger a re-resolve when the stream errors. lb.cc.cc.ResolveNow(resolver.ResolveNowOption{}) + lb.mu.Lock() + lb.remoteBalancerConnected = false + lb.fullServerList = nil + // Enter fallback when connection to remote balancer is lost, and the + // aggregated state is not Ready. + if !lb.inFallback && lb.state != connectivity.Ready { + // Entering fallback. + lb.refreshSubConns(lb.resolvedBackendAddrs, false) + } + lb.mu.Unlock() + if !doBackoff { retryCount = 0 continue diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index 0cdb5c37..2f4e3e01 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -230,18 +230,21 @@ func (b *remoteBalancer) BalanceLoad(stream lbgrpc.LoadBalancer_BalanceLoadServe b.stats.merge(req.GetClientStats()) } }() - for v := range b.sls { - resp = &lbpb.LoadBalanceResponse{ - LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{ - ServerList: v, - }, + for { + select { + case v := <-b.sls: + resp = &lbpb.LoadBalanceResponse{ + LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{ + ServerList: v, + }, + } + case <-stream.Context().Done(): + return stream.Context().Err() } if err := stream.Send(resp); err != nil { return err } } - <-b.done - return nil } type testServer struct { @@ -297,6 +300,9 @@ type testServers struct { backends []*grpc.Server beIPs []net.IP bePorts []int + + lbListener net.Listener + beListeners []net.Listener } func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), err error) { @@ -317,7 +323,7 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er beIPs = append(beIPs, beLis.Addr().(*net.TCPAddr).IP) bePorts = append(bePorts, beLis.Addr().(*net.TCPAddr).Port) - beListeners = append(beListeners, beLis) + beListeners = append(beListeners, newRestartableListener(beLis)) } backends := startBackends(beServerName, false, beListeners...) @@ -327,6 +333,7 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er err = fmt.Errorf("failed to create the listener for the load balancer %v", err) return } + lbLis = newRestartableListener(lbLis) lbCreds := &serverNameCheckCreds{ sn: lbServerName, } @@ -344,6 +351,9 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er backends: backends, beIPs: beIPs, bePorts: bePorts, + + lbListener: lbLis, + beListeners: beListeners, } cleanup = func() { defer stopBackends(backends) @@ -712,7 +722,7 @@ func TestFallback(t *testing.T) { testC := testpb.NewTestServiceClient(cc) r.UpdateState(resolver.State{Addresses: []resolver.Address{{ - Addr: "", + Addr: "invalid.address", Type: resolver.GRPCLB, ServerName: lbServerName, }, { @@ -723,7 +733,7 @@ func TestFallback(t *testing.T) { var p peer.Peer if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { - t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) + t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, ", err) } if p.Addr.String() != beLis.Addr().String() { t.Fatalf("got peer: %v, want peer: %v", p.Addr, beLis.Addr()) @@ -739,16 +749,62 @@ func TestFallback(t *testing.T) { ServerName: beServerName, }}}) + var backendUsed bool for i := 0; i < 1000; i++ { if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] { - return + backendUsed = true + break } time.Sleep(time.Millisecond) } - t.Fatalf("No RPC sent to backend behind remote balancer after 1 second") + if !backendUsed { + t.Fatalf("No RPC sent to backend behind remote balancer after 1 second") + } + + // Close backend and remote balancer connections, should use fallback. + tss.beListeners[0].(*restartableListener).stopPreviousConns() + tss.lbListener.(*restartableListener).stopPreviousConns() + time.Sleep(time.Second) + + var fallbackUsed bool + for i := 0; i < 1000; i++ { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) + } + if p.Addr.String() == beLis.Addr().String() { + fallbackUsed = true + break + } + time.Sleep(time.Millisecond) + } + if !fallbackUsed { + t.Fatalf("No RPC sent to fallback after 1 second") + } + + // Restart backend and remote balancer, should not use backends. + tss.beListeners[0].(*restartableListener).restart() + tss.lbListener.(*restartableListener).restart() + tss.ls.sls <- sl + + time.Sleep(time.Second) + + var backendUsed2 bool + for i := 0; i < 1000; i++ { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) + } + if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] { + backendUsed2 = true + break + } + time.Sleep(time.Millisecond) + } + if !backendUsed2 { + t.Fatalf("No RPC sent to backend behind remote balancer after 1 second") + } } // The remote balancer sends response with duplicates to grpclb client. diff --git a/balancer/grpclb/grpclb_test_util_test.go b/balancer/grpclb/grpclb_test_util_test.go new file mode 100644 index 00000000..5d3e6ba7 --- /dev/null +++ b/balancer/grpclb/grpclb_test_util_test.go @@ -0,0 +1,85 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpclb + +import ( + "net" + "sync" +) + +type tempError struct{} + +func (*tempError) Error() string { + return "grpclb test temporary error" +} +func (*tempError) Temporary() bool { + return true +} + +type restartableListener struct { + net.Listener + addr string + + mu sync.Mutex + closed bool + conns []net.Conn +} + +func newRestartableListener(l net.Listener) *restartableListener { + return &restartableListener{ + Listener: l, + addr: l.Addr().String(), + } +} + +func (l *restartableListener) Accept() (conn net.Conn, err error) { + conn, err = l.Listener.Accept() + if err == nil { + l.mu.Lock() + if l.closed { + conn.Close() + l.mu.Unlock() + return nil, &tempError{} + } + l.conns = append(l.conns, conn) + l.mu.Unlock() + } + return +} + +func (l *restartableListener) Close() error { + return l.Listener.Close() +} + +func (l *restartableListener) stopPreviousConns() { + l.mu.Lock() + l.closed = true + tmp := l.conns + l.conns = nil + l.mu.Unlock() + for _, conn := range tmp { + conn.Close() + } +} + +func (l *restartableListener) restart() { + l.mu.Lock() + l.closed = false + l.mu.Unlock() +}