grpclb: enter fallback if no balancer addresses are available (#3119)

This is necessary because there's another way to select grpclb (by specifying grpclb in service config's balancing policy field). So it's possible that grpclb is picked, but resolver doesn't have any balancer addresses.

When an update without balancer address is received, grpclb closes the underlying ClientConn to remote balancer, and enters fallback mode.

Note that grpclb waits until the ClientConn and the RPC goroutines are actually closed to do the fallback work. This can avoid race caused by async close.
This commit is contained in:
Menghan Li
2019-10-31 09:43:18 -07:00
committed by GitHub
parent ff5f0e93f5
commit 459a38a0e3
4 changed files with 301 additions and 128 deletions

View File

@ -187,7 +187,7 @@ type lbBalancer struct {
// send to remote LB ClientConn through this resolver. // send to remote LB ClientConn through this resolver.
manualResolver *lbManualResolver manualResolver *lbManualResolver
// The ClientConn to talk to the remote balancer. // The ClientConn to talk to the remote balancer.
ccRemoteLB *grpc.ClientConn ccRemoteLB *remoteBalancerCCWrapper
// backoff for calling remote balancer. // backoff for calling remote balancer.
backoff backoff.Strategy backoff backoff.Strategy
@ -426,6 +426,8 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
addrs := ccs.ResolverState.Addresses addrs := ccs.ResolverState.Addresses
if len(addrs) == 0 { if len(addrs) == 0 {
// There should be at least one address, either grpclb server or
// fallback. Empty address is not valid.
return balancer.ErrBadResolverState return balancer.ErrBadResolverState
} }
@ -439,28 +441,33 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
} }
} }
if lb.ccRemoteLB == nil { if len(remoteBalancerAddrs) == 0 {
if len(remoteBalancerAddrs) == 0 { if lb.ccRemoteLB != nil {
grpclog.Errorf("grpclb: no remote balancer address is available, should never happen") lb.ccRemoteLB.close()
return balancer.ErrBadResolverState lb.ccRemoteLB = nil
} }
} else if lb.ccRemoteLB == nil {
// First time receiving resolved addresses, create a cc to remote // First time receiving resolved addresses, create a cc to remote
// balancers. // balancers.
lb.dialRemoteLB(remoteBalancerAddrs[0].ServerName) lb.newRemoteBalancerCCWrapper()
// Start the fallback goroutine. // Start the fallback goroutine.
go lb.fallbackToBackendsAfter(lb.fallbackTimeout) go lb.fallbackToBackendsAfter(lb.fallbackTimeout)
} }
// cc to remote balancers uses lb.manualResolver. Send the updated remote if lb.ccRemoteLB != nil {
// balancer addresses to it through manualResolver. // cc to remote balancers uses lb.manualResolver. Send the updated remote
lb.manualResolver.UpdateState(resolver.State{Addresses: remoteBalancerAddrs}) // balancer addresses to it through manualResolver.
lb.manualResolver.UpdateState(resolver.State{Addresses: remoteBalancerAddrs})
}
lb.mu.Lock() lb.mu.Lock()
lb.resolvedBackendAddrs = backendAddrs lb.resolvedBackendAddrs = backendAddrs
if lb.inFallback { if len(remoteBalancerAddrs) == 0 || lb.inFallback {
// This means we received a new list of resolved backends, and we are // If there's no remote balancer address in ClientConn update, grpclb
// still in fallback mode. Need to update the list of backends we are // enters fallback mode immediately.
// using to the new list of backends. //
// If a new update is received while grpclb is in fallback, update the
// list of backends being used to the new fallback backends.
lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst) lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
} }
lb.mu.Unlock() lb.mu.Unlock()
@ -475,7 +482,7 @@ func (lb *lbBalancer) Close() {
} }
close(lb.doneCh) close(lb.doneCh)
if lb.ccRemoteLB != nil { if lb.ccRemoteLB != nil {
lb.ccRemoteLB.Close() lb.ccRemoteLB.close()
} }
lb.cc.close() lb.cc.close()
} }

View File

@ -23,6 +23,7 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"sync"
"time" "time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -34,6 +35,7 @@ import (
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal" "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
@ -107,6 +109,12 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
fallbackModeChanged := lb.inFallback != fallback fallbackModeChanged := lb.inFallback != fallback
lb.inFallback = fallback lb.inFallback = fallback
if fallbackModeChanged && lb.inFallback {
// Clear previous received list when entering fallback, so if the server
// comes back and sends the same list again, the new addresses will be
// used.
lb.fullServerList = nil
}
balancingPolicyChanged := lb.usePickFirst != pickFirst balancingPolicyChanged := lb.usePickFirst != pickFirst
oldUsePickFirst := lb.usePickFirst oldUsePickFirst := lb.usePickFirst
@ -196,7 +204,71 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
lb.updateStateAndPicker(true, true) lb.updateStateAndPicker(true, true)
} }
func (lb *lbBalancer) readServerList(s *balanceLoadClientStream) error { type remoteBalancerCCWrapper struct {
cc *grpc.ClientConn
lb *lbBalancer
backoff backoff.Strategy
done chan struct{}
// waitgroup to wait for all goroutines to exit.
wg sync.WaitGroup
}
func (lb *lbBalancer) newRemoteBalancerCCWrapper() {
var dopts []grpc.DialOption
if creds := lb.opt.DialCreds; creds != nil {
dopts = append(dopts, grpc.WithTransportCredentials(creds))
} else if bundle := lb.grpclbClientConnCreds; bundle != nil {
dopts = append(dopts, grpc.WithCredentialsBundle(bundle))
} else {
dopts = append(dopts, grpc.WithInsecure())
}
if lb.opt.Dialer != nil {
dopts = append(dopts, grpc.WithContextDialer(lb.opt.Dialer))
}
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`))
wrb := internal.WithResolverBuilder.(func(resolver.Builder) grpc.DialOption)
dopts = append(dopts, wrb(lb.manualResolver))
if channelz.IsOn() {
dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID))
}
// Enable Keepalive for grpclb client.
dopts = append(dopts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 20 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: true,
}))
// The dial target is not important.
//
// The grpclb server addresses will set field ServerName, and creds will
// receive ServerName as authority.
cc, err := grpc.DialContext(context.Background(), "grpclb.subClientConn", dopts...)
if err != nil {
grpclog.Fatalf("failed to dial: %v", err)
}
ccw := &remoteBalancerCCWrapper{
cc: cc,
lb: lb,
backoff: lb.backoff,
done: make(chan struct{}),
}
lb.ccRemoteLB = ccw
ccw.wg.Add(1)
go ccw.watchRemoteBalancer()
}
// close closed the ClientConn to remote balancer, and waits until all
// goroutines to finish.
func (ccw *remoteBalancerCCWrapper) close() {
close(ccw.done)
ccw.cc.Close()
ccw.wg.Wait()
}
func (ccw *remoteBalancerCCWrapper) readServerList(s *balanceLoadClientStream) error {
for { for {
reply, err := s.Recv() reply, err := s.Recv()
if err != nil { if err != nil {
@ -206,12 +278,12 @@ func (lb *lbBalancer) readServerList(s *balanceLoadClientStream) error {
return fmt.Errorf("grpclb: failed to recv server list: %v", err) return fmt.Errorf("grpclb: failed to recv server list: %v", err)
} }
if serverList := reply.GetServerList(); serverList != nil { if serverList := reply.GetServerList(); serverList != nil {
lb.processServerList(serverList) ccw.lb.processServerList(serverList)
} }
} }
} }
func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) { func (ccw *remoteBalancerCCWrapper) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) {
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
defer ticker.Stop() defer ticker.Stop()
for { for {
@ -220,7 +292,7 @@ func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.D
case <-s.Context().Done(): case <-s.Context().Done():
return return
} }
stats := lb.clientStats.toClientStats() stats := ccw.lb.clientStats.toClientStats()
t := time.Now() t := time.Now()
stats.Timestamp = &timestamppb.Timestamp{ stats.Timestamp = &timestamppb.Timestamp{
Seconds: t.Unix(), Seconds: t.Unix(),
@ -236,23 +308,23 @@ func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.D
} }
} }
func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) { func (ccw *remoteBalancerCCWrapper) callRemoteBalancer() (backoff bool, _ error) {
lbClient := &loadBalancerClient{cc: lb.ccRemoteLB} lbClient := &loadBalancerClient{cc: ccw.cc}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
stream, err := lbClient.BalanceLoad(ctx, grpc.WaitForReady(true)) stream, err := lbClient.BalanceLoad(ctx, grpc.WaitForReady(true))
if err != nil { if err != nil {
return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err) return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
} }
lb.mu.Lock() ccw.lb.mu.Lock()
lb.remoteBalancerConnected = true ccw.lb.remoteBalancerConnected = true
lb.mu.Unlock() ccw.lb.mu.Unlock()
// grpclb handshake on the stream. // grpclb handshake on the stream.
initReq := &lbpb.LoadBalanceRequest{ initReq := &lbpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{ LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
InitialRequest: &lbpb.InitialLoadBalanceRequest{ InitialRequest: &lbpb.InitialLoadBalanceRequest{
Name: lb.target, Name: ccw.lb.target,
}, },
}, },
} }
@ -271,21 +343,24 @@ func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) {
return true, fmt.Errorf("grpclb: Delegation is not supported") return true, fmt.Errorf("grpclb: Delegation is not supported")
} }
ccw.wg.Add(1)
go func() { go func() {
defer ccw.wg.Done()
if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 { if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
lb.sendLoadReport(stream, d) ccw.sendLoadReport(stream, d)
} }
}() }()
// No backoff if init req/resp handshake was successful. // No backoff if init req/resp handshake was successful.
return false, lb.readServerList(stream) return false, ccw.readServerList(stream)
} }
func (lb *lbBalancer) watchRemoteBalancer() { func (ccw *remoteBalancerCCWrapper) watchRemoteBalancer() {
defer ccw.wg.Done()
var retryCount int var retryCount int
for { for {
doBackoff, err := lb.callRemoteBalancer() doBackoff, err := ccw.callRemoteBalancer()
select { select {
case <-lb.doneCh: case <-ccw.done:
return return
default: default:
if err != nil { if err != nil {
@ -297,76 +372,31 @@ func (lb *lbBalancer) watchRemoteBalancer() {
} }
} }
// Trigger a re-resolve when the stream errors. // Trigger a re-resolve when the stream errors.
lb.cc.cc.ResolveNow(resolver.ResolveNowOption{}) ccw.lb.cc.cc.ResolveNow(resolver.ResolveNowOption{})
lb.mu.Lock() ccw.lb.mu.Lock()
lb.remoteBalancerConnected = false ccw.lb.remoteBalancerConnected = false
lb.fullServerList = nil ccw.lb.fullServerList = nil
// Enter fallback when connection to remote balancer is lost, and the // Enter fallback when connection to remote balancer is lost, and the
// aggregated state is not Ready. // aggregated state is not Ready.
if !lb.inFallback && lb.state != connectivity.Ready { if !ccw.lb.inFallback && ccw.lb.state != connectivity.Ready {
// Entering fallback. // Entering fallback.
lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst) ccw.lb.refreshSubConns(ccw.lb.resolvedBackendAddrs, true, ccw.lb.usePickFirst)
} }
lb.mu.Unlock() ccw.lb.mu.Unlock()
if !doBackoff { if !doBackoff {
retryCount = 0 retryCount = 0
continue continue
} }
timer := time.NewTimer(lb.backoff.Backoff(retryCount)) timer := time.NewTimer(ccw.backoff.Backoff(retryCount)) // Copy backoff
select { select {
case <-timer.C: case <-timer.C:
case <-lb.doneCh: case <-ccw.done:
timer.Stop() timer.Stop()
return return
} }
retryCount++ retryCount++
} }
} }
func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
var dopts []grpc.DialOption
if creds := lb.opt.DialCreds; creds != nil {
if err := creds.OverrideServerName(remoteLBName); err == nil {
dopts = append(dopts, grpc.WithTransportCredentials(creds))
} else {
grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v, using Insecure", err)
dopts = append(dopts, grpc.WithInsecure())
}
} else if bundle := lb.grpclbClientConnCreds; bundle != nil {
dopts = append(dopts, grpc.WithCredentialsBundle(bundle))
} else {
dopts = append(dopts, grpc.WithInsecure())
}
if lb.opt.Dialer != nil {
dopts = append(dopts, grpc.WithContextDialer(lb.opt.Dialer))
}
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, grpc.WithBalancerName(grpc.PickFirstBalancerName))
wrb := internal.WithResolverBuilder.(func(resolver.Builder) grpc.DialOption)
dopts = append(dopts, wrb(lb.manualResolver))
if channelz.IsOn() {
dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID))
}
// Enable Keepalive for grpclb client.
dopts = append(dopts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 20 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: true,
}))
// DialContext using manualResolver.Scheme, which is a random scheme
// generated when init grpclb. The target scheme here is not important.
//
// The grpc dial target will be used by the creds (ALTS) as the authority,
// so it has to be set to remoteLBName that comes from resolver.
cc, err := grpc.DialContext(context.Background(), remoteLBName, dopts...)
if err != nil {
grpclog.Fatalf("failed to dial: %v", err)
}
lb.ccRemoteLB = cc
go lb.watchRemoteBalancer()
}

View File

@ -49,8 +49,8 @@ import (
) )
var ( var (
lbServerName = "bar.com" lbServerName = "lb.server.com"
beServerName = "foo.com" beServerName = "backends.com"
lbToken = "iamatoken" lbToken = "iamatoken"
// Resolver replaces localhost with fakeName in Next(). // Resolver replaces localhost with fakeName in Next().
@ -60,9 +60,8 @@ var (
) )
type serverNameCheckCreds struct { type serverNameCheckCreds struct {
mu sync.Mutex mu sync.Mutex
sn string sn string
expected string
} }
func (c *serverNameCheckCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { func (c *serverNameCheckCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
@ -72,10 +71,10 @@ func (c *serverNameCheckCreds) ServerHandshake(rawConn net.Conn) (net.Conn, cred
} }
return rawConn, nil, nil return rawConn, nil, nil
} }
func (c *serverNameCheckCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { func (c *serverNameCheckCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
b := make([]byte, len(c.expected)) b := make([]byte, len(authority))
errCh := make(chan error, 1) errCh := make(chan error, 1)
go func() { go func() {
_, err := rawConn.Read(b) _, err := rawConn.Read(b)
@ -84,34 +83,25 @@ func (c *serverNameCheckCreds) ClientHandshake(ctx context.Context, addr string,
select { select {
case err := <-errCh: case err := <-errCh:
if err != nil { if err != nil {
fmt.Printf("Failed to read the server name from the server %v", err) fmt.Printf("test-creds: failed to read expected authority name from the server: %v\n", err)
return nil, nil, err return nil, nil, err
} }
case <-ctx.Done(): case <-ctx.Done():
return nil, nil, ctx.Err() return nil, nil, ctx.Err()
} }
if c.expected != string(b) { if authority != string(b) {
fmt.Printf("Read the server name %s want %s", string(b), c.expected) fmt.Printf("test-creds: got authority from ClientConn %q, expected by server %q\n", authority, string(b))
return nil, nil, errors.New("received unexpected server name") return nil, nil, errors.New("received unexpected server name")
} }
return rawConn, nil, nil return rawConn, nil, nil
} }
func (c *serverNameCheckCreds) Info() credentials.ProtocolInfo { func (c *serverNameCheckCreds) Info() credentials.ProtocolInfo {
c.mu.Lock()
defer c.mu.Unlock()
return credentials.ProtocolInfo{} return credentials.ProtocolInfo{}
} }
func (c *serverNameCheckCreds) Clone() credentials.TransportCredentials { func (c *serverNameCheckCreds) Clone() credentials.TransportCredentials {
c.mu.Lock() return &serverNameCheckCreds{}
defer c.mu.Unlock()
return &serverNameCheckCreds{
expected: c.expected,
}
} }
func (c *serverNameCheckCreds) OverrideServerName(s string) error { func (c *serverNameCheckCreds) OverrideServerName(s string) error {
c.mu.Lock()
defer c.mu.Unlock()
c.expected = s
return nil return nil
} }
@ -388,9 +378,7 @@ func TestGRPCLB(t *testing.T) {
Servers: bes, Servers: bes,
} }
tss.ls.sls <- sl tss.ls.sls <- sl
creds := serverNameCheckCreds{ creds := serverNameCheckCreds{}
expected: beServerName,
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
@ -439,9 +427,7 @@ func TestGRPCLBWeighted(t *testing.T) {
portsToIndex[tss.bePorts[i]] = i portsToIndex[tss.bePorts[i]] = i
} }
creds := serverNameCheckCreds{ creds := serverNameCheckCreds{}
expected: beServerName,
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
@ -509,9 +495,7 @@ func TestDropRequest(t *testing.T) {
Drop: true, Drop: true,
}}, }},
} }
creds := serverNameCheckCreds{ creds := serverNameCheckCreds{}
expected: beServerName,
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
@ -671,9 +655,7 @@ func TestBalancerDisconnects(t *testing.T) {
lbs = append(lbs, tss.lb) lbs = append(lbs, tss.lb)
} }
creds := serverNameCheckCreds{ creds := serverNameCheckCreds{}
expected: beServerName,
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
@ -752,9 +734,7 @@ func TestFallback(t *testing.T) {
Servers: bes, Servers: bes,
} }
tss.ls.sls <- sl tss.ls.sls <- sl
creds := serverNameCheckCreds{ creds := serverNameCheckCreds{}
expected: beServerName,
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
@ -770,9 +750,8 @@ func TestFallback(t *testing.T) {
Type: resolver.GRPCLB, Type: resolver.GRPCLB,
ServerName: lbServerName, ServerName: lbServerName,
}, { }, {
Addr: beLis.Addr().String(), Addr: beLis.Addr().String(),
Type: resolver.Backend, Type: resolver.Backend,
ServerName: beServerName,
}}}) }}})
var p peer.Peer var p peer.Peer
@ -788,9 +767,8 @@ func TestFallback(t *testing.T) {
Type: resolver.GRPCLB, Type: resolver.GRPCLB,
ServerName: lbServerName, ServerName: lbServerName,
}, { }, {
Addr: beLis.Addr().String(), Addr: beLis.Addr().String(),
Type: resolver.Backend, Type: resolver.Backend,
ServerName: beServerName,
}}}) }}})
var backendUsed bool var backendUsed bool
@ -851,6 +829,165 @@ func TestFallback(t *testing.T) {
} }
} }
type pickfirstFailOnEmptyAddrsListBuilder struct {
balancer.Builder // pick_first builder.
}
func (b *pickfirstFailOnEmptyAddrsListBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
pf := b.Builder.Build(cc, opts)
return &pickfirstFailOnEmptyAddrsList{pf}
}
type pickfirstFailOnEmptyAddrsList struct {
balancer.Balancer // pick_first balancer.
}
func (b *pickfirstFailOnEmptyAddrsList) UpdateClientConnState(s balancer.ClientConnState) error {
addrs := s.ResolverState.Addresses
if len(addrs) == 0 {
return balancer.ErrBadResolverState
}
b.Balancer.HandleResolvedAddrs(addrs, nil)
return nil
}
func (b *pickfirstFailOnEmptyAddrsList) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b.Balancer.HandleSubConnStateChange(sc, state.ConnectivityState)
}
func (b *pickfirstFailOnEmptyAddrsList) ResolverError(error) {}
func TestFallBackWithNoServerAddress(t *testing.T) {
defer leakcheck.Check(t)
defer func() func() {
// Override pick_first with a balancer that returns error to trigger
// re-resolve, to test that when grpclb accepts no server address,
// re-resolve is never triggered.
pfb := balancer.Get("pick_first")
balancer.Register(&pickfirstFailOnEmptyAddrsListBuilder{pfb})
return func() { balancer.Register(pfb) }
}()()
resolveNowCh := make(chan struct{}, 1)
r, cleanup := manual.GenerateAndRegisterManualResolver()
r.ResolveNowCallback = func(resolver.ResolveNowOption) {
select {
case <-resolveNowCh:
default:
}
resolveNowCh <- struct{}{}
}
defer cleanup()
tss, cleanup, err := newLoadBalancer(1)
if err != nil {
t.Fatalf("failed to create new load balancer: %v", err)
}
defer cleanup()
// Start a standalone backend.
beLis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to listen %v", err)
}
defer beLis.Close()
standaloneBEs := startBackends(beServerName, true, beLis)
defer stopBackends(standaloneBEs)
be := &lbpb.Server{
IpAddress: tss.beIPs[0],
Port: int32(tss.bePorts[0]),
LoadBalanceToken: lbToken,
}
var bes []*lbpb.Server
bes = append(bes, be)
sl := &lbpb.ServerList{
Servers: bes,
}
creds := serverNameCheckCreds{}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
defer cc.Close()
testC := testpb.NewTestServiceClient(cc)
// Select grpclb with service config.
const pfc = `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}}]}}]}`
scpr := r.CC.ParseServiceConfig(pfc)
if scpr.Err != nil {
t.Fatalf("Error parsing config %q: %v", pfc, scpr.Err)
}
for i := 0; i < 2; i++ {
// Send an update with only backend address. grpclb should enter fallback
// and use the fallback backend.
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{
Addr: beLis.Addr().String(),
Type: resolver.Backend,
}},
ServiceConfig: scpr,
})
select {
case <-resolveNowCh:
t.Errorf("unexpected resolveNow when grpclb gets no balancer address 1111, %d", i)
case <-time.After(time.Second):
}
var p peer.Peer
rpcCtx, rpcCancel := context.WithTimeout(context.Background(), time.Second)
defer rpcCancel()
if _, err := testC.EmptyCall(rpcCtx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
if p.Addr.String() != beLis.Addr().String() {
t.Fatalf("got peer: %v, want peer: %v", p.Addr, beLis.Addr())
}
select {
case <-resolveNowCh:
t.Errorf("unexpected resolveNow when grpclb gets no balancer address 2222, %d", i)
case <-time.After(time.Second):
}
tss.ls.sls <- sl
// Send an update with balancer address. The backends behind grpclb should
// be used.
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}, {
Addr: beLis.Addr().String(),
Type: resolver.Backend,
}},
ServiceConfig: scpr,
})
var backendUsed bool
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] {
backendUsed = true
break
}
time.Sleep(time.Millisecond)
}
if !backendUsed {
t.Fatalf("No RPC sent to backend behind remote balancer after 1 second")
}
}
}
func TestGRPCLBPickFirst(t *testing.T) { func TestGRPCLBPickFirst(t *testing.T) {
defer leakcheck.Check(t) defer leakcheck.Check(t)
@ -881,9 +1018,7 @@ func TestGRPCLBPickFirst(t *testing.T) {
portsToIndex[tss.bePorts[i]] = i portsToIndex[tss.bePorts[i]] = i
} }
creds := serverNameCheckCreds{ creds := serverNameCheckCreds{}
expected: beServerName,
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
@ -1034,7 +1169,7 @@ func runAndGetStats(t *testing.T, drop bool, runRPCs func(*grpc.ClientConn)) *rp
} }
tss.ls.sls <- &lbpb.ServerList{Servers: servers} tss.ls.sls <- &lbpb.ServerList{Servers: servers}
tss.ls.statsDura = 100 * time.Millisecond tss.ls.statsDura = 100 * time.Millisecond
creds := serverNameCheckCreds{expected: beServerName} creds := serverNameCheckCreds{}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()

1
vet.sh
View File

@ -115,6 +115,7 @@ fi
staticcheck -go 1.9 -checks 'inherit,-ST1015' -ignore ' staticcheck -go 1.9 -checks 'inherit,-ST1015' -ignore '
google.golang.org/grpc/balancer.go:SA1019 google.golang.org/grpc/balancer.go:SA1019
google.golang.org/grpc/balancer/grpclb/grpclb_remote_balancer.go:SA1019 google.golang.org/grpc/balancer/grpclb/grpclb_remote_balancer.go:SA1019
google.golang.org/grpc/balancer/grpclb/grpclb_test.go:SA1019
google.golang.org/grpc/balancer/roundrobin/roundrobin_test.go:SA1019 google.golang.org/grpc/balancer/roundrobin/roundrobin_test.go:SA1019
google.golang.org/grpc/xds/internal/balancer/edsbalancer/balancergroup.go:SA1019 google.golang.org/grpc/xds/internal/balancer/edsbalancer/balancergroup.go:SA1019
google.golang.org/grpc/xds/internal/resolver/xds_resolver.go:SA1019 google.golang.org/grpc/xds/internal/resolver/xds_resolver.go:SA1019