grpclb: backoff for RPC call if init handshake was unsucessful (#2077)

This commit is contained in:
Menghan Li
2018-06-14 14:31:41 -07:00
committed by GitHub
parent e218c924aa
commit 692f13ae39
2 changed files with 40 additions and 8 deletions

View File

@ -37,6 +37,7 @@ import (
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
) )
@ -46,6 +47,19 @@ const (
grpclbName = "grpclb" grpclbName = "grpclb"
) )
var (
// defaultBackoffConfig configures the backoff strategy that's used when the
// init handshake in the RPC is unsuccessful. It's not for the clientconn
// reconnect backoff.
//
// It has the same value as the default grpc.DefaultBackoffConfig.
//
// TODO: make backoff configurable.
defaultBackoffConfig = backoff.Exponential{
MaxDelay: 120 * time.Second,
}
)
func convertDuration(d *durationpb.Duration) time.Duration { func convertDuration(d *durationpb.Duration) time.Duration {
if d == nil { if d == nil {
return 0 return 0
@ -147,6 +161,7 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
scStates: make(map[balancer.SubConn]connectivity.State), scStates: make(map[balancer.SubConn]connectivity.State),
picker: &errPicker{err: balancer.ErrNoSubConnAvailable}, picker: &errPicker{err: balancer.ErrNoSubConnAvailable},
clientStats: newRPCStats(), clientStats: newRPCStats(),
backoff: defaultBackoffConfig, // TODO: make backoff configurable.
} }
return lb return lb
@ -165,6 +180,8 @@ type lbBalancer struct {
manualResolver *lbManualResolver manualResolver *lbManualResolver
// The ClientConn to talk to the remote balancer. // The ClientConn to talk to the remote balancer.
ccRemoteLB *grpc.ClientConn ccRemoteLB *grpc.ClientConn
// backoff for calling remote balancer.
backoff backoff.Strategy
// Support client side load reporting. Each picker gets a reference to this, // Support client side load reporting. Each picker gets a reference to this,
// and will update its content. // and will update its content.

View File

@ -179,13 +179,13 @@ func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.D
} }
} }
func (lb *lbBalancer) callRemoteBalancer() error { func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) {
lbClient := &loadBalancerClient{cc: lb.ccRemoteLB} lbClient := &loadBalancerClient{cc: lb.ccRemoteLB}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
stream, err := lbClient.BalanceLoad(ctx, grpc.FailFast(false)) stream, err := lbClient.BalanceLoad(ctx, grpc.FailFast(false))
if err != nil { if err != nil {
return fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err) return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
} }
// grpclb handshake on the stream. // grpclb handshake on the stream.
@ -197,18 +197,18 @@ func (lb *lbBalancer) callRemoteBalancer() error {
}, },
} }
if err := stream.Send(initReq); err != nil { if err := stream.Send(initReq); err != nil {
return fmt.Errorf("grpclb: failed to send init request: %v", err) return true, fmt.Errorf("grpclb: failed to send init request: %v", err)
} }
reply, err := stream.Recv() reply, err := stream.Recv()
if err != nil { if err != nil {
return fmt.Errorf("grpclb: failed to recv init response: %v", err) return true, fmt.Errorf("grpclb: failed to recv init response: %v", err)
} }
initResp := reply.GetInitialResponse() initResp := reply.GetInitialResponse()
if initResp == nil { if initResp == nil {
return fmt.Errorf("grpclb: reply from remote balancer did not include initial response") return true, fmt.Errorf("grpclb: reply from remote balancer did not include initial response")
} }
if initResp.LoadBalancerDelegate != "" { if initResp.LoadBalancerDelegate != "" {
return fmt.Errorf("grpclb: Delegation is not supported") return true, fmt.Errorf("grpclb: Delegation is not supported")
} }
go func() { go func() {
@ -216,12 +216,14 @@ func (lb *lbBalancer) callRemoteBalancer() error {
lb.sendLoadReport(stream, d) lb.sendLoadReport(stream, d)
} }
}() }()
return lb.readServerList(stream) // No backoff if init req/resp handshake was successful.
return false, lb.readServerList(stream)
} }
func (lb *lbBalancer) watchRemoteBalancer() { func (lb *lbBalancer) watchRemoteBalancer() {
var retryCount int
for { for {
err := lb.callRemoteBalancer() doBackoff, err := lb.callRemoteBalancer()
select { select {
case <-lb.doneCh: case <-lb.doneCh:
return return
@ -231,6 +233,19 @@ func (lb *lbBalancer) watchRemoteBalancer() {
} }
} }
if !doBackoff {
retryCount = 0
continue
}
timer := time.NewTimer(lb.backoff.Backoff(retryCount))
select {
case <-timer.C:
case <-lb.doneCh:
timer.Stop()
return
}
retryCount++
} }
} }