diff --git a/balancer/grpclb/grpclb.go b/balancer/grpclb/grpclb.go index 15cbc105..0a48533c 100644 --- a/balancer/grpclb/grpclb.go +++ b/balancer/grpclb/grpclb.go @@ -37,6 +37,7 @@ import ( lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/resolver" ) @@ -46,6 +47,19 @@ const ( grpclbName = "grpclb" ) +var ( + // defaultBackoffConfig configures the backoff strategy that's used when the + // init handshake in the RPC is unsuccessful. It's not for the clientconn + // reconnect backoff. + // + // It has the same value as the default grpc.DefaultBackoffConfig. + // + // TODO: make backoff configurable. + defaultBackoffConfig = backoff.Exponential{ + MaxDelay: 120 * time.Second, + } +) + func convertDuration(d *durationpb.Duration) time.Duration { if d == nil { return 0 @@ -147,6 +161,7 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal scStates: make(map[balancer.SubConn]connectivity.State), picker: &errPicker{err: balancer.ErrNoSubConnAvailable}, clientStats: newRPCStats(), + backoff: defaultBackoffConfig, // TODO: make backoff configurable. } return lb @@ -165,6 +180,8 @@ type lbBalancer struct { manualResolver *lbManualResolver // The ClientConn to talk to the remote balancer. ccRemoteLB *grpc.ClientConn + // backoff for calling remote balancer. + backoff backoff.Strategy // Support client side load reporting. Each picker gets a reference to this, // and will update its content. diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index 8c399b33..fb4c117b 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -179,13 +179,13 @@ func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.D } } -func (lb *lbBalancer) callRemoteBalancer() error { +func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) { lbClient := &loadBalancerClient{cc: lb.ccRemoteLB} ctx, cancel := context.WithCancel(context.Background()) defer cancel() stream, err := lbClient.BalanceLoad(ctx, grpc.FailFast(false)) if err != nil { - return fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err) + return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err) } // grpclb handshake on the stream. @@ -197,18 +197,18 @@ func (lb *lbBalancer) callRemoteBalancer() error { }, } if err := stream.Send(initReq); err != nil { - return fmt.Errorf("grpclb: failed to send init request: %v", err) + return true, fmt.Errorf("grpclb: failed to send init request: %v", err) } reply, err := stream.Recv() if err != nil { - return fmt.Errorf("grpclb: failed to recv init response: %v", err) + return true, fmt.Errorf("grpclb: failed to recv init response: %v", err) } initResp := reply.GetInitialResponse() if initResp == nil { - return fmt.Errorf("grpclb: reply from remote balancer did not include initial response") + return true, fmt.Errorf("grpclb: reply from remote balancer did not include initial response") } if initResp.LoadBalancerDelegate != "" { - return fmt.Errorf("grpclb: Delegation is not supported") + return true, fmt.Errorf("grpclb: Delegation is not supported") } go func() { @@ -216,12 +216,14 @@ func (lb *lbBalancer) callRemoteBalancer() error { lb.sendLoadReport(stream, d) } }() - return lb.readServerList(stream) + // No backoff if init req/resp handshake was successful. + return false, lb.readServerList(stream) } func (lb *lbBalancer) watchRemoteBalancer() { + var retryCount int for { - err := lb.callRemoteBalancer() + doBackoff, err := lb.callRemoteBalancer() select { case <-lb.doneCh: return @@ -231,6 +233,19 @@ func (lb *lbBalancer) watchRemoteBalancer() { } } + if !doBackoff { + retryCount = 0 + continue + } + + timer := time.NewTimer(lb.backoff.Backoff(retryCount)) + select { + case <-timer.C: + case <-lb.doneCh: + timer.Stop() + return + } + retryCount++ } }