Make non-failfast RPC get new transport instead of waiting
This commit is contained in:
@ -43,7 +43,6 @@ import (
|
|||||||
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
"golang.org/x/net/trace"
|
"golang.org/x/net/trace"
|
||||||
"google.golang.org/grpc/codes"
|
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
"google.golang.org/grpc/grpclog"
|
"google.golang.org/grpc/grpclog"
|
||||||
"google.golang.org/grpc/transport"
|
"google.golang.org/grpc/transport"
|
||||||
@ -502,7 +501,11 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
|
|||||||
}
|
}
|
||||||
return nil, nil, errConnClosing
|
return nil, nil, errConnClosing
|
||||||
}
|
}
|
||||||
t, err := ac.wait(ctx, !opts.BlockingWait)
|
// ac.wait should block on transient failure only if balancer is nil and RPC is non-failfast.
|
||||||
|
// - If RPC is failfast, ac.wait should not block.
|
||||||
|
// - If balancer is not nil, ac.wait should return errConnClosing on transient failure
|
||||||
|
// so that non-failfast RPCs will try to get a new transport instead of waiting on ac.
|
||||||
|
t, err := ac.wait(ctx, cc.dopts.balancer == nil && opts.BlockingWait)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if put != nil {
|
if put != nil {
|
||||||
put()
|
put()
|
||||||
@ -754,8 +757,8 @@ func (ac *addrConn) transportMonitor() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
|
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
|
||||||
// iv) transport is in TransientFailure and the RPC is fail-fast.
|
// iv) transport is in TransientFailure and blocking is false.
|
||||||
func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTransport, error) {
|
func (ac *addrConn) wait(ctx context.Context, blocking bool) (transport.ClientTransport, error) {
|
||||||
for {
|
for {
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
switch {
|
switch {
|
||||||
@ -767,9 +770,9 @@ func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTr
|
|||||||
ct := ac.transport
|
ct := ac.transport
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
return ct, nil
|
return ct, nil
|
||||||
case ac.state == TransientFailure && failFast:
|
case ac.state == TransientFailure && !blocking:
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
return nil, Errorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure")
|
return nil, errConnClosing
|
||||||
default:
|
default:
|
||||||
ready := ac.ready
|
ready := ac.ready
|
||||||
if ready == nil {
|
if ready == nil {
|
||||||
|
Reference in New Issue
Block a user