grpclb: fallback after init (#2681)

regenerate picker when switching between fallback/non-fallback, because new SubConn state might not be updated for cached SubConns
This commit is contained in:
Menghan Li
2019-04-02 16:27:52 -07:00
committed by GitHub
parent 955eb8a3c8
commit 4745f6ae0d
4 changed files with 253 additions and 51 deletions

View File

@ -172,7 +172,6 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
doneCh: make(chan struct{}),
manualResolver: r,
csEvltr: &balancer.ConnectivityStateEvaluator{},
subConns: make(map[resolver.Address]balancer.SubConn),
scStates: make(map[balancer.SubConn]connectivity.State),
picker: &errPicker{err: balancer.ErrNoSubConnAvailable},
@ -238,15 +237,15 @@ type lbBalancer struct {
// but with only READY SCs will be gerenated.
backendAddrs []resolver.Address
// Roundrobin functionalities.
csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State
subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn.
scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns.
picker balancer.Picker
// Support fallback to resolved backend addresses if there's no response
// from remote balancer within fallbackTimeout.
fallbackTimerExpired bool
serverListReceived bool
remoteBalancerConnected bool
serverListReceived bool
inFallback bool
// resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set
// when resolved address updates are received, and read in the goroutine
// handling fallback.
@ -264,13 +263,16 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
return
}
if lb.state == connectivity.Connecting {
lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
return
}
var readySCs []balancer.SubConn
if lb.usePickFirst {
if lb.state == connectivity.Ready || lb.state == connectivity.Idle {
for _, sc := range lb.subConns {
readySCs = append(readySCs, sc)
break
}
for _, sc := range lb.subConns {
readySCs = append(readySCs, sc)
break
}
} else {
for _, a := range lb.backendAddrs {
@ -286,10 +288,13 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
// If there's no ready SubConns, always re-pick. This is to avoid drops
// unless at least one SubConn is ready. Otherwise we may drop more
// often than want because of drops + re-picks(which become re-drops).
//
// This doesn't seem to be necessary after the connecting check above.
// Kept for safety.
lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
return
}
if len(lb.fullServerList) <= 0 {
if lb.inFallback {
lb.picker = newRRPicker(readySCs)
return
}
@ -305,6 +310,34 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
prevLBPicker.updateReadySCs(readySCs)
}
// aggregateSubConnStats calculate the aggregated state of SubConns in
// lb.SubConns. These SubConns are subconns in use (when switching between
// fallback and grpclb). lb.scState contains states for all SubConns, including
// those in cache (SubConns are cached for 10 seconds after remove).
//
// The aggregated state is:
// - If at least one SubConn in Ready, the aggregated state is Ready;
// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
// - Else the aggregated state is TransientFailure.
func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
var numConnecting uint64
for _, sc := range lb.subConns {
if state, ok := lb.scStates[sc]; ok {
switch state {
case connectivity.Ready:
return connectivity.Ready
case connectivity.Connecting:
numConnecting++
}
}
}
if numConnecting > 0 {
return connectivity.Connecting
}
return connectivity.TransientFailure
}
func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
@ -328,18 +361,33 @@ func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivi
// kept the sc's state in scStates. Remove state for this sc here.
delete(lb.scStates, sc)
}
oldAggrState := lb.state
lb.state = lb.csEvltr.RecordTransition(oldS, s)
// Regenerate picker when one of the following happens:
// Force regenerate picker if
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (oldS == connectivity.Ready) != (s == connectivity.Ready) ||
(lb.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
lb.regeneratePicker(false)
lb.updateStateAndPicker((oldS == connectivity.Ready) != (s == connectivity.Ready), false)
// Enter fallback when the aggregated state is not Ready and the connection
// to remote balancer is lost.
if lb.state != connectivity.Ready {
if !lb.inFallback && !lb.remoteBalancerConnected {
// Enter fallback.
lb.refreshSubConns(lb.resolvedBackendAddrs, false)
}
}
}
// updateStateAndPicker re-calculate the aggregated state, and regenerate picker
// if overall state is changed.
//
// If forceRegeneratePicker is true, picker will be regenerated.
func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop bool) {
oldAggrState := lb.state
lb.state = lb.aggregateSubConnStates()
// Regenerate picker when one of the following happens:
// - caller wants to regenerate
// - the aggregated state changed
if forceRegeneratePicker || (lb.state != oldAggrState) {
lb.regeneratePicker(resetDrop)
}
lb.cc.UpdateBalancerState(lb.state, lb.picker)
@ -357,11 +405,11 @@ func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
return
}
lb.mu.Lock()
if lb.serverListReceived {
if lb.inFallback || lb.serverListReceived {
lb.mu.Unlock()
return
}
lb.fallbackTimerExpired = true
// Enter fallback.
lb.refreshSubConns(lb.resolvedBackendAddrs, false)
lb.mu.Unlock()
}
@ -405,10 +453,7 @@ func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
lb.mu.Lock()
lb.resolvedBackendAddrs = backendAddrs
// If serverListReceived is true, connection to remote balancer was
// successful and there's no need to do fallback anymore.
// If fallbackTimerExpired is false, fallback hasn't happened yet.
if !lb.serverListReceived && lb.fallbackTimerExpired {
if lb.inFallback {
// This means we received a new list of resolved backends, and we are
// still in fallback mode. Need to update the list of backends we are
// using to the new list of backends.

View File

@ -85,24 +85,26 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
backendAddrs = append(backendAddrs, addr)
}
// Call refreshSubConns to create/remove SubConns.
// Call refreshSubConns to create/remove SubConns. If we are in fallback,
// this is also exiting fallback.
lb.refreshSubConns(backendAddrs, true)
// Regenerate and update picker no matter if there's update on backends (if
// any SubConn will be newed/removed). Because since the full serverList was
// different, there might be updates in drops or pick weights(different
// number of duplicates). We need to update picker with the fulllist.
//
// Now with cache, even if SubConn was newed/removed, there might be no
// state changes.
lb.regeneratePicker(true)
lb.cc.UpdateBalancerState(lb.state, lb.picker)
}
// refreshSubConns creates/removes SubConns with backendAddrs. It returns a bool
// indicating whether the backendAddrs are different from the cached
// backendAddrs (whether any SubConn was newed/removed).
// refreshSubConns creates/removes SubConns with backendAddrs, and refreshes
// balancer state and picker.
//
// Caller must hold lb.mu.
func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fromGRPCLBServer bool) {
defer func() {
// Regenerate and update picker after refreshing subconns because with
// cache, even if SubConn was newed/removed, there might be no state
// changes (the subconn will be kept in cache, not actually
// newed/removed).
lb.updateStateAndPicker(true, true)
}()
lb.inFallback = !fromGRPCLBServer
opts := balancer.NewSubConnOptions{}
if fromGRPCLBServer {
opts.CredsBundle = lb.grpclbBackendCreds
@ -218,6 +220,9 @@ func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) {
if err != nil {
return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
}
lb.mu.Lock()
lb.remoteBalancerConnected = true
lb.mu.Unlock()
// grpclb handshake on the stream.
initReq := &lbpb.LoadBalanceRequest{
@ -270,6 +275,17 @@ func (lb *lbBalancer) watchRemoteBalancer() {
// Trigger a re-resolve when the stream errors.
lb.cc.cc.ResolveNow(resolver.ResolveNowOption{})
lb.mu.Lock()
lb.remoteBalancerConnected = false
lb.fullServerList = nil
// Enter fallback when connection to remote balancer is lost, and the
// aggregated state is not Ready.
if !lb.inFallback && lb.state != connectivity.Ready {
// Entering fallback.
lb.refreshSubConns(lb.resolvedBackendAddrs, false)
}
lb.mu.Unlock()
if !doBackoff {
retryCount = 0
continue

View File

@ -230,18 +230,21 @@ func (b *remoteBalancer) BalanceLoad(stream lbgrpc.LoadBalancer_BalanceLoadServe
b.stats.merge(req.GetClientStats())
}
}()
for v := range b.sls {
resp = &lbpb.LoadBalanceResponse{
LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{
ServerList: v,
},
for {
select {
case v := <-b.sls:
resp = &lbpb.LoadBalanceResponse{
LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{
ServerList: v,
},
}
case <-stream.Context().Done():
return stream.Context().Err()
}
if err := stream.Send(resp); err != nil {
return err
}
}
<-b.done
return nil
}
type testServer struct {
@ -297,6 +300,9 @@ type testServers struct {
backends []*grpc.Server
beIPs []net.IP
bePorts []int
lbListener net.Listener
beListeners []net.Listener
}
func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), err error) {
@ -317,7 +323,7 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er
beIPs = append(beIPs, beLis.Addr().(*net.TCPAddr).IP)
bePorts = append(bePorts, beLis.Addr().(*net.TCPAddr).Port)
beListeners = append(beListeners, beLis)
beListeners = append(beListeners, newRestartableListener(beLis))
}
backends := startBackends(beServerName, false, beListeners...)
@ -327,6 +333,7 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er
err = fmt.Errorf("failed to create the listener for the load balancer %v", err)
return
}
lbLis = newRestartableListener(lbLis)
lbCreds := &serverNameCheckCreds{
sn: lbServerName,
}
@ -344,6 +351,9 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er
backends: backends,
beIPs: beIPs,
bePorts: bePorts,
lbListener: lbLis,
beListeners: beListeners,
}
cleanup = func() {
defer stopBackends(backends)
@ -712,7 +722,7 @@ func TestFallback(t *testing.T) {
testC := testpb.NewTestServiceClient(cc)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{
Addr: "",
Addr: "invalid.address",
Type: resolver.GRPCLB,
ServerName: lbServerName,
}, {
@ -723,7 +733,7 @@ func TestFallback(t *testing.T) {
var p peer.Peer
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
if p.Addr.String() != beLis.Addr().String() {
t.Fatalf("got peer: %v, want peer: %v", p.Addr, beLis.Addr())
@ -739,16 +749,62 @@ func TestFallback(t *testing.T) {
ServerName: beServerName,
}}})
var backendUsed bool
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] {
return
backendUsed = true
break
}
time.Sleep(time.Millisecond)
}
t.Fatalf("No RPC sent to backend behind remote balancer after 1 second")
if !backendUsed {
t.Fatalf("No RPC sent to backend behind remote balancer after 1 second")
}
// Close backend and remote balancer connections, should use fallback.
tss.beListeners[0].(*restartableListener).stopPreviousConns()
tss.lbListener.(*restartableListener).stopPreviousConns()
time.Sleep(time.Second)
var fallbackUsed bool
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.String() == beLis.Addr().String() {
fallbackUsed = true
break
}
time.Sleep(time.Millisecond)
}
if !fallbackUsed {
t.Fatalf("No RPC sent to fallback after 1 second")
}
// Restart backend and remote balancer, should not use backends.
tss.beListeners[0].(*restartableListener).restart()
tss.lbListener.(*restartableListener).restart()
tss.ls.sls <- sl
time.Sleep(time.Second)
var backendUsed2 bool
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] {
backendUsed2 = true
break
}
time.Sleep(time.Millisecond)
}
if !backendUsed2 {
t.Fatalf("No RPC sent to backend behind remote balancer after 1 second")
}
}
// The remote balancer sends response with duplicates to grpclb client.

View File

@ -0,0 +1,85 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpclb
import (
"net"
"sync"
)
type tempError struct{}
func (*tempError) Error() string {
return "grpclb test temporary error"
}
func (*tempError) Temporary() bool {
return true
}
type restartableListener struct {
net.Listener
addr string
mu sync.Mutex
closed bool
conns []net.Conn
}
func newRestartableListener(l net.Listener) *restartableListener {
return &restartableListener{
Listener: l,
addr: l.Addr().String(),
}
}
func (l *restartableListener) Accept() (conn net.Conn, err error) {
conn, err = l.Listener.Accept()
if err == nil {
l.mu.Lock()
if l.closed {
conn.Close()
l.mu.Unlock()
return nil, &tempError{}
}
l.conns = append(l.conns, conn)
l.mu.Unlock()
}
return
}
func (l *restartableListener) Close() error {
return l.Listener.Close()
}
func (l *restartableListener) stopPreviousConns() {
l.mu.Lock()
l.closed = true
tmp := l.conns
l.conns = nil
l.mu.Unlock()
for _, conn := range tmp {
conn.Close()
}
}
func (l *restartableListener) restart() {
l.mu.Lock()
l.closed = false
l.mu.Unlock()
}